javaweb
javaweb copied to clipboard
Akka
介绍
Actor 模型由 Carl Hewitt 于上世纪70年代早期提出,目的是为了解决分布式编程中一系列的编程问题。Actor 的要点包括:Actor 是一个个相互之间独立的实体; Actor 可以通过消息来通信,一个 Actor 收到其他Actor的信息后,可以根据需要作出各种相应反应;消息的类型可以是任意的,消息的内容也可以是任意的;当一个 Actor 收到多个消息时,它先建立一个消息队列,将接收到的消息就放入队列,每次从队列中取出一个消息体进行处理。
Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Actor 模型应用。Akka 使得开发人员可以更轻松地开发具有容错性、可扩展性和跨平台的并发程序,在工业界得到了广泛应用。Akka能够给应用程序带来的几个重要的特性是:
-
容错性
-
可伸缩性
-
异步性
-
事件驱动架构(EDA)
-
远程透明性 Actor是Akka中最核心的组件,以至于我们在编写基于Akka的应用程序时,大部分时间都会和Actor打交道,那么Actor到底是怎样的一种抽象呢?一个Actor对象封装了状态和行为,但是它不和外界其它的Actor共享状态,如果一个Actor想要和另一个Actor交互,能且只能通过发送消息来达到信息交换的目的。可见,一个Actor能够很好地保护其内部状态的安全。
-
对并发模型进行了更高的抽象
-
异步、非阻塞、高性能的事件驱动编程模型
-
轻量级事件处理(1GB内存可容纳百万级别个Actor)
-
每个Actor都有对应一个邮箱
-
Actor是串行处理消息的
-
Actor中的消息是不可变的
入门
public class MyActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
static Props props() {
return Props.create(MyActor.class, () -> new MyActor());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, s -> {
log.info("Received String message: {}", s);
})
.matchAny(o -> log.info("received unknown message"))
.build();
}
}
@Test
public void actor(){
final ActorSystem system = ActorSystem.create("helloakka");
try {
final ActorRef howdyGreeter =
system.actorOf(MyActor.props());
howdyGreeter.tell("2",ActorRef.noSender());
howdyGreeter.tell(1,ActorRef.noSender());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}finally {
system.terminate();
}
}
ActorSystem
ActorSystem主要有以下三个功能:
- 管理调度服务 ActorSystem的的精髓在于将任务分拆,直到一个任务小到可以被完整处理,然后将其委托给Actor进行处理,所以ActorSystem最核心的一个功能就是管理和调度整个系统的运行,好比一个公司的管理者,他需要制定整个公司的发展计划,还需要将工作分配给相应的工作人员去完成,保障整个公司的正确运转,其实这里也体现了软件设计中的分而治之,Actor中的核心思想也是这样。
- Akka中Actor的组织是一种树形结构
- 每个Actor都有父级,有可能有子级当然也可能没有
- 父级Actor给其子级Actor分配资源,任务,并管理其的生命状态(监管和监控)
- 配置相关参数
- 日志功能
Actor引用,路径和地址
Actor引用是ActorRef的子类,每个Actor有唯一的ActorRef,Actor引用可以看成是Actor的代理,与Actor打交道都需要通过Actor引用,Actor引用可以帮对应Actor发送消息,也可以接收消息,向Actor发送消息其实是将消息发送到Actor对应的引用上,再由它将消息投寄到具体Actor的信箱中,所以ActorRef在整个Actor系统是一个非常重要的角色。
@Data
@AllArgsConstructor
public class Message {
protected String content;
}
public class Meeting extends Message{
public Meeting(String content) {
super(content);
}
}
public class Done extends Message{
public Done(String content) {
super(content);
}
}
public class DoAction extends Message{
public DoAction(String content) {
super(content);
}
}
@Data
public class Confirm extends Message{
private ActorPath actorPath;
public Confirm(String content, ActorPath actorPath) {
super(content);
this.actorPath = actorPath;
}
}
public class Business extends Message{
public Business(String content) {
super(content);
}
}
final ActorSystem system = ActorSystem.create("company");
ActorRef bossRef = system.actorOf(BossActor.props(),"boss");
bossRef.tell(new Business("二手车市场"),ActorRef.noSender());
public class BossActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private int taskCount = 0;
public static Props props() {
return Props.create(BossActor.class, () -> new BossActor());
}
Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Business.class, b -> {
log.info("看准了商机!行动!");
log.info("我们是 {} 公司",self().path().address());
List managers = new ArrayList<>();
ActorRef managerActorRef = context().actorOf(ManagerActor.props(), "managerA"); //这里我们召唤3个主管
ActorRef managerActorRef2 = context().actorOf(ManagerActor.props(), "managerB"); //这里我们召唤3个主管
ActorRef managerActorRef3 = context().actorOf(ManagerActor.props(), "managerC"); //这里我们召唤3个主管
managers.add(managerActorRef);
managers.add(managerActorRef2);
managers.add(managerActorRef3);
Iterator<ActorRef> iterator = managers.iterator();
while(iterator.hasNext()){
ActorRef managerActor = iterator.next();
CompletableFuture<Object> future =
ask(managerActor, new Meeting("开会讨论一下"), t)
.toCompletableFuture();
CompletableFuture<Confirm> transformed = CompletableFuture.allOf(future)
.thenApply(v -> {
Confirm x = (Confirm) future.join();
return x;
});
Confirm confirm = transformed.get();
log.info("{} ---{}",confirm.getContent(),confirm.getActorPath().parent().toString());
ActorSelection manager = context().actorSelection(confirm.getActorPath());
manager.tell(new DoAction("展开业务"),self());
}
})
.match(Done.class , d -> {
log.info("? {}",d.getContent());
taskCount++;
if (taskCount == 3) {
log.info("项目做完了,涨工资!");
context().system().terminate();
}
})
.build();
}
}
public class ManagerActor extends AbstractActor{
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public static Props props() {
return Props.create(ManagerActor.class, () -> new ManagerActor());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Meeting.class, m->{
log.info(" {} 加入会议",self().path());
log.info("老板说 {}",m.getContent());
sender().tell(
new Confirm("老板,我知道了",self().path()),
self());
})
.match(DoAction.class,d->{
log.info("分配工作");
ActorRef workerActorRef = context().actorOf(WorkerActor.props(), "worker");
workerActorRef.forward(d,context());
})
.build();
}
}
public class ManagerActor extends AbstractActor{
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public static Props props() {
return Props.create(ManagerActor.class, () -> new ManagerActor());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Meeting.class, m->{
log.info(" {} 加入会议",self().path());
log.info("老板说 {}",m.getContent());
sender().tell(
new Confirm("老板,我知道了",self().path()),
self());
})
.match(DoAction.class,d->{
log.info("分配工作");
ActorRef workerActorRef = context().actorOf(WorkerActor.props(), "worker");
workerActorRef.forward(d,context());
})
.build();
}
}
复杂一点的操作: https://scala.cool/2017/04/learning-akka-2/
监管
监管者
一个actor系统在其创建过程中至少要启动三个actor,如上图所示,下面来说说这三个Actor的功能:
1./: 根监管者
顾名思义,它是一个老大,它监管着ActorSystem中所有的顶级Actor,顶级Actor有以下几种:
- /user: 是所有由用户创建的顶级actor的监管者;用ActorSystem.actorOf创建的actor在其下。
- /system: 是所有由系统创建的顶级actor的监管者,如日志监听器,或由配置指定在actor系统启动时自动部署的actor。
- /deadLetters: 是死信actor,所有发往已经终止或不存在的actor的消息会被重定向到这里。
- /temp:是所有系统创建的短时actor的监管者,例如那些在ActorRef.ask的实现中用到的actor。
- /remote: 是一个人造虚拟路径,用来存放所有其监管者是远程actor引用的actor。
跟我们平常打交道最多的就是/user,它是我们在程序中用ActorSystem.actorOf创建的actor的监管者,下面的容错我们重点关心的就是它下面的失败处理,其他几种顶级Actor具体功能定义已经给出,有兴趣的也可以去了解一下。
根监管者监管着所有顶级Actor,对它们的各种失败情况进行处理,一般来说如果错误要上升到根监管者,整个系统就会停止。
2./user: 顶级actor监管者 上面已经讲过/user是所有由用户创建的顶级actor的监管者,即用ActorSystem.actorOf创建的actor,我们可以自己制定相应的监管策略,但由于它是actor系统启动时就产生的,所以我们需要在相应的配置文件里配置,具体的配置可以参考这里Akka配置 https://doc.akka.io/docs/akka/current/scala/general/configuration.html
3./system: 系统监管者
/system所有由系统创建的顶级actor的监管者,比如Akka中的日志监听器,因为在Akka中日志本身也是用Actor实现的,/system的监管策略如下:对收到的除ActorInitializationException和ActorKilledException之外的所有Exception无限地执行重启,当然这也会终止其所有子actor。所有其他Throwable被上升到根监管者,然后整个actor系统将会关闭。
用户创建的普通actor的监管:
Actor系统的组织结构,是一种树形结构,其实这种结构对actor的监管是非常有利的,Akka实现的是一种叫“父监管”的形式,每一个被创建的actor都由其父亲所监管,这种限制使得actor的监管结构隐式符合其树形结构,所以我们可以得出一个结论:
一个被创建的Actor肯定是一个被监管者,也可能是一个监管者,它监管着它的子级Actor
监管策略
上面我们对ActorSystem中的监管角色有了一定的了解,那么到底是如何制定相应的监管策略呢?Akka中有以下4种策略:
- 恢复下属,保持下属当前积累的内部状态
- 重启下属,清除下属的内部状态
- 永久地停止下属
- 升级失败(沿监管树向上传递失败),由此失败自己
https://scala.cool/2017/05/learning-akka-3/
Akka共享内存
通过通讯来实现共享内存,而不是用共享内存来实现通讯
Mailbox
Mailbox在Actor模型是一个很重要的概念,我们都知道向一个Actor发送的消息首先都会被存储到它所对应的Mailbox中,那么我们先来看看MailBox的定义结构
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {}
很清晰Mailbox内部维护了一个messageQueue这样的消息队列,并继承了Scala自身定义的ForkJoinTask任务执行类和我们很熟悉的Runnable接口,由此可以看出,Mailbox底层还是利用Java中的线程进行处理的。那么我们先来看看它的run方法:
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
@inline
final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def isClosed: Boolean = currentStatus == Closed
@volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue() //去出下一条消息
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs) //递归处理下一条消息
}
}
Actor是如何保证串行处理消息的
@tailrec
final def setAsScheduled(): Boolean = { //是否有线程正在调度执行该MailBox的任务
val s = currentStatus
/*
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set.
*/
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}
当已有线程在执行返回false,若没有则去更改状态为以调度,直到被其他线程抢占或者更改成功,其中updateStatus()是线程安全的,我们可以看一下它的实现,是一个CAS操作
@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
Akka persistence的核心架构
https://scala.cool/2017/07/learning-akka-7/
CQRS
https://scala.cool/2017/08/learning-akka-8/
配置
使用Akka可以不用任何配置,Akka提供了明智的默认配置。为了适应特别的运行环境,修改默认行为,你可能需要修改:
- log level and logger backend
- enable remoting
- 消息系列化
- 路由设置
- 调度器调优 Akka使用Typesafe Config Library,纯java实现的配置库。
Akka的所有配置信息装在 ActorSystem的实例中, 或者换个说法, 从外界看来, ActorSystem 是配置信息的唯一消费者. 在构造一个actor系统时,你可以传进来一个 Config object,如果不传,就相当于传进来 ConfigFactory.load() (使用正确的classloader). 这意味着将会读取classpath根目录下的所有application.conf, application.json and application.properties这些文件—请参阅之前推荐的文档以了解细节. 然后actor系统会合并classpath根目录下的 reference.conf 来组成其内部使用的缺省配置
如果你编写的是一个Akka应用,把配置放在classpath根目录下的 application.conf 中. 如果你编写的是一个基于Akka的库,把配置放在jar包根目录下的 reference.conf 中.
Akka会读取所有jar包的reference.conf配置,所以如果你把多个jar包合并成一个jar,那么你也必须合并这些reference.conf,否则默认配置会丢失,导致Akka不能正常工作
final ActorSystem system = ActorSystem.create("CalculatorSystem",
ConfigFactory.load(("calculator")));
Akka Cluster
一些相同的ActorSystem的组合,它们具有着相同的功能,我们需要执行的任务可以随机的分配到目前可用的ActorSystem上,这点跟Nginx的负载均衡很类似,根据算法和配置将请求转发给运行正常的服务器去,Akka集群的表现形式也是这样,当然它背后的理论基础是基于gossip协议的,目前很多分布式的数据库的数据同步都采用这个协议。
Seed Nodes
Seed Nodes可以看过是种子节点或者原始节点,它的一个主要作用用于可以自动接收新加入集群的节点的信息,并与之通信,使用方式可以用配置文件或者运行时指定,推荐使用配置文件方式,比如:
akka.cluster.seed-nodes = [
"akka.tcp://ClusterSystem@host1:2552",
"akka.tcp://ClusterSystem@host2:2552"]
seed-nodes列表中的第一个节点会集群启动的时候初始化,而其他节点则是在有需要时再初始化。
当然你也可以不指定seed nodes,但你可以需要手动或者在程序中写相关逻辑让相应的节点加入集群,具体使用方式可参考官方文档。
Cluster Events
Cluster Events字面意思是集群事件,那么这是什么意思呢?其实它代表着是一个节点的各种状态和操作,举个例子,假设你在打一局王者5v5的游戏,那么你可以把十个人看成一个集群,我们每个人都是一个节点,我们的任何操作和状态都能被整个系统捕获到,比如A杀了B、A超神了,A离开了游戏,A重新连接了游戏等等,这些状态和操作在Cluster Events中就相当于节点之于集群,那么它具体是怎么使用的呢?
首先我们必须将节点注册到集群中,或者说节点订阅了某个集群,我们可以这么做:
cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
MemberEvent.class, UnreachableMember.class);
从上面的代码我们可以看到有一个MemberEvent的概念,这个其实就是每个成员所可能拥有的events,那么一个成员在它的生命周期中有以下的events
- ClusterEvent.MemberJoined - 新的节点加入集群,此时的状态是Joining;
- ClusterEvent.MemberUp - 新的节点加入集群,此时的状态是Up;
- ClusterEvent.MemberExited - 节点正在离开集群,此时的状态是Exiting;
- ClusterEvent.MemberRemoved - 节点已经离开集群,此时的状态是Removed;
- ClusterEvent.UnreachableMember - 节点被标记为不可触达;
- ClusterEvent.ReachableMember - 节点被标记为可触达;
状态说明:
- Joining: 加入集群的瞬间状态
- Up: 正常服务状态
- Leaving / Exiting: 正常移出中状态 Cluster.get(system).leave(cluster.selfAddress());
- Down: 被标记为停机(不再是集群决策的一部分) Cluster.get(system).down(address).
- Removed: 已从集群中移除
demo
akka {
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
}
}
# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
public class SimpleClusterListener extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
Cluster cluster = Cluster.get(getContext().getSystem());
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class);
}
@Override
public void postStop() throws Exception {
cluster.unsubscribe(getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ClusterEvent.MemberUp.class, mUp -> {
log.info("Member is Up: {}", mUp.member());
})
.match(ClusterEvent.UnreachableMember.class, mUnreachable -> {
log.info("Member detected as unreachable: {}", mUnreachable.member());
})
.match(ClusterEvent.MemberRemoved.class, mRemoved -> {
log.info("Member is Removed: {}", mRemoved.member());
})
.match(ClusterEvent.MemberEvent.class, message -> {
// ignore
})
.build();
}
}
文本转化example https://github.com/akka/akka-samples/tree/2.5/akka-sample-cluster-java/src/main/java/sample/cluster/transformation
aa
fds