Akka 是什么?它提供了 JVM 上的 Actor 编程模型 -- 同时兼顾了并发与分布式。它由 Scala 编写的,替代了 Scala 本身的 Actor。Actor 视线程为重量级的资源,能够以少量的内存胜任更高的并发,类似的东西有纤程,协程。有一个数据对比是同样的 1GB 内存,可以创建 2.7M 个 Actor, 而线程只能创建 4096 个,仅供参考,当然 Java 也是会基于线程池来执行的。
Actor 增加了程序的灵活性,并减轻了复杂度(标准的赞美之辞)。
所谓 Action 编程模型兼顾并发与分布,是由于让你编程时可以不用考虑线程,线程配置成为部署的范畴; Actor 之间通信只能发送异步消息,Actor 可以分布在同一 JVM, 不同 JVM, 或是不同物理机器上。
因为 《Akka IN ACTION》中提供了第一个例子起点着实有点高,所以网上找来了一个了解 Akka Actor 的最简单例子,来自于 Simple Scala Akka Actor examples (Hello, world examples)。并非纯属翻译,主要是为了练手,所以不完全一致:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class HelloActor extends Actor { def receive = { case "hey" => println("hey yourself") case _ => println("hehe") } } object Main extends App { val system = ActorSystem("HelloSystem") val helloActor = system.actorOf(Props[HelloActor], name = "helloActor") helloActor ! "hey" helloActor ! "good morning" } |
最好有一定的 Scala 基础,比如模式匹配,!
感叹号是向 Actor 发送消息,其实 !
就是一个方法名,当参数只有一个数,可省略点号与圆括号。标准的写法是
helloActor.!("hey") //也是 helloActor.tell("hey"), 或 hellActor tell "hey"
Actor 的消息是无类型的,你可以向一个 Actor 发送任意对象,关键看那个 Actor 是否处理这个消息对象。
Akka 中 Actor 创建 Actor, 那么顶层的 Actor 由谁来创建呢?答案就是 ActorSystem
。
Scala 项目最好的构建工具无疑是 sbt
, 这儿的 build.sbt
文件内容如下:
1 2 3 4 5 6 7 8 9 |
name := "simple-akka-actor" version := "0.1" scalaVersion := "2.12.4" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.5.9" ) |
全部选用了目前最高版本的 Scala(2.12.4) 和 Akka(2.5.9),最后看到的所有依赖是
➜ simple-akka-actor sbt run
[info] Loading project definition from /Users/yanbin/Workspaces/simple-akka-actor/project
[info] Loading settings from build.sbt ...
[info] Set current project to simple-akka-actor (in build file:/Users/yanbin/Workspaces/simple-akka-actor/)
[info] Running cc.unmi.Main
hey yourself
hehe
^C%
发送消息,响应消息,需要用 Ctrl+C 来结束进程,因为就像是启动了一个线程 daemon 属性为 fase 的线程池。
也可以调用 ActorSystem
的 terminate()
方法来结束进程
system.terminate()
在结束进程之前两条消息都能被处理完成。
稍加探索
处理消息的线程
我们说 Actor 编程模型并不是说并发代码不在线程中执行,而是相比于线程编程模型来说线程得已更优化的配置。那么就来看下处理消息的代码是在什么线程中执行的,把 HelloActor
的 receive 方法中的 case 语句写成如下:
1 2 |
case "hey" => println(Thread.currentThread() + ", hey yourself") case _ => println(Thread.currentThread() + ", hehe") |
重新执行 sbt run
命令,得到类似下面的输出
Thread[HelloSystem-akka.actor.default-dispatcher-2,5,run-main-group-0], hey yourself
Thread[HelloSystem-akka.actor.default-dispatcher-2,5,run-main-group-0], hehe
是在叫做 xxx.default-dispather-xxx 的线程中执行消息处理代码。它就是 Akka 的 dispatcher 线程池,对 Actor 发送消息后,消息放在 Actor 的 mailbox 里,dispatcher 线程用于从 mailbox 中取出消息,执行代码。
这个 Dispatcher 的线程池是可通过 application.conf
文件配置的。
日志系统
Akka 内置的日志输出适配器,默认的 logger 是 akka.event.Logger$DefaultLogger
, 输出到控制台。允许在 application.conf
中配置成用 SLF4J
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
}
使用日志的方式可以参考如下代码,重写前面的 HelloActor 代码
1 2 3 4 5 6 7 |
class HelloActor extends Actor { val logger = Logging(context.system, this) def receive = { case "hey" => logger.info("hey yourself") case _ => context.system.log.info("hehe") } } |
再次执行 sbt run
, 输出如下:
[INFO] [02/11/2018 18:47:23.315] [HelloSystem-akka.actor.default-dispatcher-2] [akka://HelloSystem/user/helloActor] hey yourself
[INFO] [02/11/2018 18:47:23.316] [HelloSystem-akka.actor.default-dispatcher-2] [akka.actor.ActorSystemImpl(HelloSystem)] hehe
两种方式的输出略有不同,第一种输出了 Actor 的 path, 第二种输出的是类路径。
Akka 的 Logging 本身有点绕,日志是发送到 EventStream 中,然后实际输出日志的 DefaultLogger
也是一个 Actor,这样让日志系统变为异步的。在我们切换到其他的日志(如 SLF4J),可以不用考虑异步的 Appender 配置。
Actor 的 Path
同时也看到每个 Actor 有自己的 path
, 这个 path 就可调用 ActorRef 的 path
方法获得(如 helloActor.path)。它就是用于 Akka 如何定位 Actor 的。Akka Actor 相比于线程而言一个十分突出的优点就是能够很轻松的部署 Actor 到不同机器上,远程通信透明化了。来看到本地与远程 Actor 的不同表示法
"akka://my-sys/user/service-a/worker1" // purely local
"akka.tcp://my-sys@host.example.com:5678/user/service-b" // remote
详情请参考:Actor References, Paths and Addresses
链接:
本文链接 https://yanbin.blog/akka-actor-start-with-easy-example/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。