首页 头条正文

约搏牛牛:akka-typed(1) - actor生命周期治理

欧博网址 头条 2020-05-28 15:01:37 21 0

   akka-typed的actor从建立、启用、状态转换、停用、监视等生命周期治理方式和akka-classic照样有一定的不同之处。这篇我们就先容一下akka-typed的actor生命周期治理。

每一种actor都是通过界说它的行为属性behavior形成模版,然后由对上一层的父辈actor用spawn方式发生actor实例的。发生的actor实例加入一个系统的由上至下树形结构,直接在spawn发生自己的父辈之下。akka-typed的守护guardian-actor,即根部root-actor是通过在界说ActorSystem时指定并发生的。如下:

    val config = ConfigFactory.load("application.conf") val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config) man ! GreetStarter.RepeatedGreeting("Tiger",1.seconds)

在某种意义上,这个ActorSystem实例man就代表root-actor。我们可以向man发送新闻然后由GreetStarter的behavior用自己的ActorContext举行spawn,stop,watch及分配盘算义务等,实在就是一个程序的集线器:

  object GreetStarter { import Messages._ def apply(): Behavior[SayHi] = { Behaviors.setup { ctx => val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher") val helloActor = ctx.spawn(HelloActor(), "hello-actor",props) val greeter = ctx.spawn(Greeter(helloActor), "greeter") ctx.watch(greeter) ctx.watchWith(helloActor,StopWorker("something happend")) Behaviors.receiveMessage { who =>
          if (who.name == "stop") { ctx.stop(helloActor) ctx.stop(greeter) Behaviors.stopped } else { greeter ! who Behaviors.same } } } } }

然则,总有时刻我们需要在root-actor的ActorContext之外来举行一些制造、使用actor的操作。下面这个官方文档上的例子是很好的树模:

import akka.actor.typed.Behavior import akka.actor.typed.SpawnProtocol import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps object HelloWorldMain { def apply(): Behavior[SpawnProtocol.Command] = Behaviors.setup { context =>
      // Start initial tasks // context.spawn(...)
 SpawnProtocol() } } object Main extends App { implicit val system: ActorSystem[SpawnProtocol.Command] = ActorSystem(HelloWorldMain(), "hello") // needed in implicit scope for ask (?)
import akka.actor.typed.scaladsl.AskPattern._ implicit val ec: ExecutionContext = system.executionContext implicit val timeout: Timeout = Timeout(3.seconds) val greeter: Future[ActorRef[HelloWorld.Greet]] = system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _)) val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) => context.log.info2("Greeting for {} from {}", message.whom, message.from) Behaviors.stopped } val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] = system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _)) for (greeterRef <- greeter; replyToRef <- greetedReplyTo) { greeterRef ! HelloWorld.Greet("Akka", replyToRef) } ... }

可以看到所有操作都在actor框架之外举行的。这个SpawnProtocol自己就是一个actor,如下:

object SpawnProtocol { ... final case class Spawn[T](behavior: Behavior[T], name: String, props: Props, replyTo: ActorRef[ActorRef[T]]) extends Command ... def apply(): Behavior[Command] = Behaviors.receive { (ctx, msg) => msg match { case Spawn(bhvr, name, props, replyTo) => val ref =
            if (name == null || name.equals("")) ctx.spawnAnonymous(bhvr, props) else { @tailrec def spawnWithUniqueName(c: Int): ActorRef[Any] = { val nameSuggestion = if (c == 0) name else s"$name-$c" ctx.child(nameSuggestion) match { case Some(_) => spawnWithUniqueName(c + 1) // already taken, try next
                  case None    => ctx.spawn(bhvr, nameSuggestion, props) } } spawnWithUniqueName(0) } replyTo ! ref Behaviors.same } } }

外界通过发送Spawn新闻来指定发生新的actor。

actor的状态切换就是从一种behavior转到另一种behavior。我们可以自界说behavior或者用现成的Behaviors.???。若是只是涉及内部变量转变,那么可以直接天生带着变量的当前behavior,如下:

object HelloWorldBot { def apply(max: Int): Behavior[HelloWorld.Greeted] = { bot(0, max) } private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] = Behaviors.receive { (context, message) => val n = greetingCounter + 1 context.log.info2("Greeting {} for {}", n, message.whom) if (n == max) { Behaviors.stopped } else { message.from ! HelloWorld.Greet(message.whom, context.self) bot(n, max) } } }

actor停用可以由直属父辈actor的ActorContext.stop或者自身的Behaviors.stopped来实现。Behaviors.stopped可以带入一个清算函数。在actor完全住手之前举行一些清算操作: 

object MasterControlProgram { sealed trait Command final case class SpawnJob(name: String) extends Command case object GracefulShutdown extends Command // Predefined cleanup operation
  def cleanup(log: Logger): Unit = log.info("Cleaning up!") def apply(): Behavior[Command] = { Behaviors .receive[Command] { (context, message) => message match { case SpawnJob(jobName) => context.log.info("Spawning job {}!", jobName) context.spawn(Job(jobName), name = jobName) Behaviors.same case GracefulShutdown => context.log.info("Initiating graceful shutdown...") // perform graceful stop, executing cleanup before final system termination // behavior executing cleanup is passed as a parameter to Actor.stopped
            Behaviors.stopped { () => cleanup(context.system.log) } } } .receiveSignal { case (context, PostStop) => context.log.info("Master Control Program stopped") Behaviors.same } } }

现实上一个actor转入停用stop状态可以在另一个作为监视actor的receiveSignal获取,如下:

  object GreetStarter { import Messages._ def apply(): Behavior[SayHi] = { Behaviors.setup { ctx => val props = DispatcherSelector.fromConfig("akka.actor.default-blocking-io-dispatcher") val helloActor = ctx.spawn(HelloActor(), "hello-actor",props) val greeter = ctx.spawn(Greeter(helloActor), "greeter") ctx.watch(greeter) ctx.watchWith(helloActor,StopWorker("something happend")) Behaviors.receiveMessage { who =>
          if (who.name == "stop") { ctx.stop(helloActor) ctx.stop(greeter) Behaviors.stopped } else { greeter ! who Behaviors.same } }.receiveSignal { case (context, Terminated(ref)) => context.log.info("{} stopped!", ref.path.name) Behaviors.same } } } }

下面是.receiveSignal函数及其捕捉的Signal新闻:

 trait Receive[T] extends Behavior[T] { def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] } trait Signal /** * Lifecycle signal that is fired upon restart of the Actor before replacing * the behavior with the fresh one (i.e. this signal is received within the * behavior that failed). */
sealed abstract class PreRestart extends Signal case object PreRestart extends PreRestart { def instance: PreRestart = this } /** * Lifecycle signal that is fired after this actor and all its child actors * (transitively) have terminated. The [[Terminated]] signal is only sent to * registered watchers after this signal has been processed. */
sealed abstract class PostStop extends Signal // comment copied onto object for better hints in IDEs /** * Lifecycle signal that is fired after this actor and all its child actors * (transitively) have terminated. The [[Terminated]] signal is only sent to * registered watchers after this signal has been processed. */
case object PostStop extends PostStop { def instance: PostStop = this } object Terminated { def apply(ref: ActorRef[Nothing]): Terminated = new Terminated(ref) def unapply(t: Terminated): Option[ActorRef[Nothing]] = Some(t.ref) }

 

,

Sunbet

Sunbet www.sunbet.xyz是Sunbet指定的Sunbet官网,Sunbet提供Sunbet(Sunbet)、Sunbet、申博代理合作等业务。

版权声明

本文仅代表作者观点,
不代表本站欧博网址的立场。
本文系作者授权发表,未经许可,不得转载。