Akka入门系列(五):akka cluster的基本使用

前面一个章节akka cluster管理介绍了Akka Cluster的底层原理,这一章就来看看如何使用。

集群后台接入

对外

我们知道,目前集群后台的使用方式主要有以下几种:

  • 后端直接监听指定协议的网络端口,接受外部请求,处理后按指定协议打包后返回响应。常规用法比如实现RESTful的SpringMVC,使用ProtoclBuffer、thrift等做压缩协议由Netty监听等等。
  • 由集群提供客户端API,通过客户端向集群提交请求,可同步/异步的获得结果。
  • 消息队列,通过异步的将消息发送到消息队列,集群监听消息队列获取消息后进行处理,最终将结果反馈到其他消息队列。
  • 监听流,比如对目录下文件的监听处理,或基于流式消息队列的监听处理等。

Akka提供了以下几个组件以满足这几种不同的调用方式:

  • Akka Http监听HTTP端口对外响应。 注意,这不是一个我们常见类似于SpringMVC的web服务框架,更多的是一个类似于HttpClient一样进行HTTP通信的工具集,但是是基于Actor和ActorStream的
  • Cluster ClientAkka Cluster提供的远程客户端,用以向集群提交请求并获得结果。
  • Akka Stream 提供了完整的IO流及流式处理的工具集和API。
  • Alpakaa Akka提供的整合Kafka的流式处理API。

对内

集群内部调用,一般有以下几种方式:

  • 查找目标,直接调用。由前文可知,Akka Cluster是完全的P2P结构,所以集群种任何一个Actor可以随意去请求任何的其他Actor,只需要简单的指定其ActorPath即可。
  • 发布/订阅模式。akka.cluster.pubsub.DistributedPubSubMediator可不使用外部MQ的情况下,直接在集群内部提供点对点或订阅功能。

由于官方的样例中已经提供了比较好的学习代码,本章就不再自己写代码去演示了。
官方在github例子地址:akka-samples
对于Cluster,在akka-sample-cluster-java提供了4个例子:

  • simple: 主要演示cluster启动过程中节点的交互过程,对应的是我上一篇文章
  • transformation: 最基本的cluster应用,典型的master-worker模式
  • stats: 主要演示cluster中路由的应用
  • factorial: 主要演示cluster中负载均衡的使用

最简单的例子

通常来讲,使用分布式集群的应用,大概率是并发请求量大,单请求处理较为耗时,可改为并行处理提高响应速度的,而常见的就是master-slave模式,即master接受外部请求、分配任务及返回响应,而真正的处理过程是交给slave去异步做的。所以,在这种集群应用中,不同的节点会分饰不同的角色。

第一个例子:

  • 集群分为前端和后端两部分
  • frontend维护了n个backend,并定期向backend发送hello[n]的消息,比如hello1,hello2
  • backend将字母转换为大写,返回给frontend

由于比较简单,完整的代码我就不贴了,只看几个关键点

前端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TransformationFrontend extends AbstractActor {

List<ActorRef> backends = new ArrayList<>();
int jobCounter = 0;

@Override
public Receive createReceive() {
return receiveBuilder()
.match(TransformationJob.class, job -> backends.isEmpty(), job -> {
sender().tell(new JobFailed("Service unavailable, try again later", job),
sender());
})
.match(TransformationJob.class, job -> {
jobCounter++;
backends.get(jobCounter % backends.size())
.forward(job, getContext());
})
.matchEquals(BACKEND_REGISTRATION, message -> {
getContext().watch(sender());
backends.add(sender());
})
.match(Terminated.class, terminated -> {
backends.remove(terminated.getActor());
})
.build();
}
}

前端维护了一个backend的ActorRef的列表,在收到自定义的BACKEND_REGISTRATION事件后,将消息的发送者,即Backend的actor所对应的ActorRef放到该列表去。
TransformationJob是外部提交的任务,如果列表为空时,会返回JobFailed,否则用简单的负载均衡方法将Job转发给对应的后端(当前已收到的job数量对后端数取余)。
由于只有一个Frontend,并且Actor内部有mailbox队列,所以这里的jobCounter不会出现并发问题。

后端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TransformationBackend extends AbstractActor {

Cluster cluster = Cluster.get(getContext().system());
LoggingAdapter log = Logging.getLogger(getContext().system(), this);

//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
cluster.subscribe(self(), MemberUp.class);
}

//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(self());
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(TransformationJob.class, job -> {
sender().tell(new TransformationResult(self().path().toSerializationFormat(), job.getText().toUpperCase()),
self());
})
.match(CurrentClusterState.class, state -> {
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
})
.match(MemberUp.class, mUp -> {
register(mUp.member());
})
.build();
}

void register(Member member) {
if (member.hasRole("frontend")) {
log.info("Trying to register myself: {}", self().path().toSerializationFormat());
getContext().actorSelection(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, self());
}
}
}

后端在prestart()时去监听了MemberUp的事件,当收到MemberUp时,通过简单的判断当前Member的角色是frontend就尝试给frontend发送注册消息,把自己的ActorRef加到frontend所维护的列表中。
这里为了体现是哪个后端所做的job,我加上了相关日志,在运行时可以仔细观察以下。
这里有个小问题不妨思考下:如果前端此时并未启动,这个BACKEND_REGISTRATION会怎么样呢?

启动

前端的启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "0";
final Config config =
ConfigFactory.parseString(
"akka.remote.netty.tcp.port=" + port + "\n" +
"akka.remote.artery.canonical.port=" + port)
.withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]"))
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("ClusterSystem", config);

final ActorRef frontend = system.actorOf(
Props.create(TransformationFrontend.class), "frontend");
final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ExecutionContext ec = system.dispatcher();
final AtomicInteger counter = new AtomicInteger();
system.scheduler().schedule(interval, interval, new Runnable() {
public void run() {
ask(frontend,
new TransformationJob("hello-" + counter.incrementAndGet()),
timeout).onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
}
}, ec);
}

}, ec);

}

  • 通过akka.cluster.roles来给当前所起的节点进行角色指定。指定的角色信息会带到系统的Member类中去
  • 通过定时器,定时给前端actor发送Job

后端的启动我就不贴了,简单的读取application.conf,覆盖端口配置和角色,然后启动ActorSystem去创建actor。

前面的那个问题:“如果前端此时并未启动,这个BACKEND_REGISTRATION会怎么样呢?”
在官方提供的代码中,TransformationApp这个类整合了前后端节点的启动,但是它把后端分配为2551和2552端口,即把两个后端作为了种子节点,而前端作为了普通节点。如果是像它这样放在一起启动倒也没什么,但是如果单独一个个去运行时,就可能会出现前端在等待集群创建,而后端在memberup时,并没有找到前端Actor,导致注册失败。因为除非Cluster发生变化导致重新Gossip改变节点状态,否则MemberUp事件不会再发,这时哪怕前端启动了,其维护的后端列表依然为空。所以,一般情况下,像这种master-slave的用法,最好master就作为种子节点。

可以看到这样去实现master-slave虽然可以,但是依然存在些问题。好在Akka已经提供好了解决方案,就是Router和Routee,我们下章继续。

注意!
如果在配置文件中是否启用artery,actor的地址会有不同,关闭artery时一定要指定协议为akk.tcp!
比如在配置文件中定义种子节点时:

  • artery enabled = on
    seed-nodes = [“akka://ClusterSystem@127.0.0.1:2551”]
  • artery enabled = off
    seed-nodes = [“akka.tcp://ClusterSystem@127.0.0.1:2551”]
    同时,启用artery在win10上调试还遇到一个问题,它会将logbuffer临时文件写到windows的\Users\<user_name>\AppData\Local\Temp下,正常我们在编辑器里运行调试完毕,会习惯性直接关闭,这时这些临时文件是不会被删除的。结果就是不断的启停后磁盘会被写满。
    解决的方案有两个:
    1. 关闭artery,使用netty tcp;
    2. 通过命令优雅的关闭ActorSystem

集群的监控

Akka对于集群的监控方式有两种:

  1. 内置的JMX
  2. 对外提供HTTP API

其中内置的JMX将会慢慢被取消掉,而且在使用时还要求每个JVM启动时加上JMX的接口参数,使用不太方便,同时所提供的jmx-client工具在windows下是无法正常使用的,所以不推荐使用。
官方推荐使用对外提供HTTP接口的Akka Management。它是所有其他管理模块核心,其他管理模块都是基于它做的插件。相关关系如下图:

我们要使用的是Cluster Http Management,开启比较简单:

  1. 添加依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<dependency>
<groupId>com.lightbend.akka.management</groupId>
<artifactId>akka-management-cluster-http_2.12</artifactId>
<version>0.19.0</version>
<exclusions>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-distributed-data_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-sharding_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>

这里之所以把其他的akka包都exclude掉,是因为我使用的akka包是2.5.17,而这个管理包依赖目前对应的这些包版本还是2.5.15,没有更新过来,不过不影响使用。

  1. 配置文件中配置监听地址和端口(可选)
    1
    2
    3
    4
    5
    6
    7
    8
    management {
    http {
    hostname = "127.0.0.1"
    port = "8558"
    bind-hostname = "0.0.0.0"
    bind-port = "8558"
    }
    }

这一步是可选的,如果不配置,默认监听的是调用InetAddress.getLocalHost.getHostAddress这段代码返回的IP,未必是127.0.0.1,一般是当前电脑的对外IP。如果使用docker,那么就必须要显式指定一下了。
默认端口是8558.

  1. 开启监控
    1
    2
    ClusterHttpManagement httpManagement = ClusterHttpManagement.get(system);
    AkkaManagement.get(system).start();

启动后,我们就可以从http://127.0.0.1:8558这个地址获取集群信息并发送集群指令了。

PathHTTP methodRequired form fieldsDescription
/cluster/members/GETNoneReturns the status of the Cluster in JSON format.
/cluster/members/POSTaddress: {address}Executes join operation in cluster for the provided {address}.
/cluster/members/{address}GETNoneReturns the status of {address} in the Cluster in JSON format.
/cluster/members/{address}DELETENoneExecutes leave operation in cluster for provided {address}.
/cluster/members/{address}PUToperation: DownExecutes down operation in cluster for provided {address}.
/cluster/members/{address}PUToperation: LeaveExecutes leave operation in cluster for provided {address}.
/cluster/shards/{name}GETNoneReturns shard info for the shard region with the provided {name}

更详细使用请参考官方文档

本文由 EdisonXu - 徐焱飞 创作,采用 CC BY 4.0 CN协议 进行许可。 可自由转载、引用,但需署名作者且注明文章出处。
本文链接为http://edisonxu.com/2018/11/13/akka-cluster-usage.html
如果您觉得文章不错,可以请我喝一杯咖啡!
actor, akka, 分布式, 并发