Akka入门系列(六):akka cluster中的路由和负载均衡

在使用路由功能之前,我们需要先了解下常规概念:

  • Router 路由器,消息由外部发送到路由器,再由路由器通过路由算法转发给具体的执行者,相当于消息的中转站。
  • Routee 路由目标,最早处理消息的地方。

在Akka中,提供了两种做路由的方式:

  • 直接使用akka.routing.Router
  • 使用内置的Router Actor

直接使用Router类

直接使用akka.routing.Router类的原理其实与上一章的最简单的例子是一样的,只不过akka的Router类比我们实现的更复杂、更强大。创建Router类时需提供两个参数:

  • 路由规则
    akka为Router类提供了以下几种内置的路由算法类:

    • akka.routing.RoundRobinRoutingLogic
    • akka.routing.RandomRoutingLogic
    • akka.routing.SmallestMailboxRoutingLogic
    • akka.routing.BroadcastRoutingLogic
    • akka.routing.ScatterGatherFirstCompletedRoutingLogic
    • akka.routing.TailChoppingRoutingLogic
    • akka.routing.ConsistentHashingRoutingLogic
      具体算法介绍请参见文章最后的表格
  • 路由目标的序列
    该序列支持通过调用router.addRouteerouter.removeRoutee进行动态变化,但需要注意的是,akka.routing.Router类时一个immutable的线程安全类,即不可改变,这里的改变其实是将原来的router内的的routee队列增加/去掉指定routee后copy一份生成一个新的Router

    1
    def removeRoutee(routee: Routee): Router = copy(routees = routees.filterNot(_ == routee))

依赖

1
2
3
4
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.12</artifactId>
</dependency>

配置文件application.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
akka {
actor {
provider = "cluster"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
artery {
enabled = off
canonical.hostname = "127.0.0.1"
canonical.port = 0
}
}

cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
}
}

实际做事的SlaveActor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SlaveActor extends AbstractActor {

LoggingAdapter log = Logging.getLogger(getContext().system(), this);

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, word-> log.info("Node {} receives: {}", getSelf().path().toSerializationFormat(), word))
.build();
}

public static void main(String[] args) {
Config config =
ConfigFactory.parseString("akka.cluster.roles = [slave]")
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.actorOf(Props.create(SlaveActor.class), "slaveActor");
}
}

包含路由的MasterActor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class MasterActor extends AbstractActor {

LoggingAdapter log = Logging.getLogger(getContext().system(), this);

private Router router = new Router(new RoundRobinRoutingLogic(), new ArrayList<>());
private Cluster cluster = Cluster.get(getContext().system());
boolean isReady = false;
private static final String SLAVE_PATH = "/user/slaveActor";

@Override
public void preStart() throws Exception {
cluster.subscribe(self(), ClusterEvent.MemberEvent.class, ClusterEvent.ReachabilityEvent.class);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg->{
log.info("Master got: {}", msg);
if(!isReady)
log.warning("Is not ready yet!");
else {
log.info("Routee size: {}", router.routees().length());
router.route(msg, getSender());
}
})
.match(ClusterEvent.MemberUp.class, mUp->{
if(mUp.member().hasRole("slave")) {
Address address = mUp.member().address();
String path = address + SLAVE_PATH;
ActorSelection selection = getContext().actorSelection(path);
router = router.addRoutee(selection);
isReady=true;
log.info("New routee is added!");
}
})
.match(ClusterEvent.MemberRemoved.class, mRemoved->{
router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address()+SLAVE_PATH));
log.info("Routee is removed");
})
.match(ClusterEvent.UnreachableMember.class, mRemoved-> {
router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address() + SLAVE_PATH));
log.info("Routee is removed");
})
.build();
}

public static void main(String[] args) {
int port = 2551;

// Override the configuration of the port
Config config =
ConfigFactory.parseString(
"akka.remote.netty.tcp.port=" + port + "\n" +
"akka.remote.artery.canonical.port=" + port)
.withFallback(
ConfigFactory.parseString("akka.cluster.roles = [master]"))
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("ClusterSystem", config);
ClusterHttpManagement.get(system);
AkkaManagement.get(system).start();
system.actorOf(Props.create(MasterActor.class), "masterActor");
}
}

这里将MasterActor监听了集群的MemberUp事件,通过判断事件中包含的role判断是否是SlaveActor加入集群。如果是,则将该SlaveActor加到Router中。同时,如果SlaveActor退出或变成Unreachable状态,则从Router中删除。

向MasterActor请求的客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Client
{
public static void main( String[] args ) throws InterruptedException {
Config config = ConfigFactory.load();
ActorSystem system = ActorSystem.create("ClusterSystem", config);
ActorSelection toFind = system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor");
int counter = 0;
while(true){
toFind.tell("hello "+counter++, ActorRef.noSender());
System.out.println("Finish telling");
Thread.sleep(2000);
}
}
}

分别启动四个窗口: 一个masterActor节点,两个slaveActor节点,一个Client,可以看到两个slaveActor轮流打印Client传递进去的消息。这时,把其中一个slaveActor关闭,可以看到Client发送的所有消息将被剩下那个slaveActor打印出来。

使用Router Actor

除了我们自己在Actor里调用akka.routing.Router类外,Akka还提供了根据配置直接生成一个内置的RouterActor。路由逻辑在remoting和cluster两个模块中都有,如果要启用remoting中的路由,则需要引入remoting的依赖,在cluster环境下并不推荐直接去用remoting中的路由,而是用cluster模块中的cluster aware router。

RouterActor有两种类型:

  • Pool
    Router自动创建Routee作为自己的子Actor,然后部署到远程节点上。Routee被终止时,会自动从Router的路由表中删除,除非使用动态路由(指定resizer),否则Router不会重新创建新的Routee,当所有的Routee都停止时,Router也自动停止。
  • Group
    Routee actor是在Router actor以外单独创建好了,RouterActoSelection向指定的Actor Path发送消息,但默认并不监控Routee

Router actor可以通过程序配置或文件配置。如果是通过文件配置时,必须要在代码中使用FromConfigRemoteRouterConfig(将Routee部署到远程节点去)去显式的读取相关配置,否则即便在配置文件中定义了路由相关配置,akka也不会去使用。
Router actor在转发消息时不会更改消息的sender,而routee actor在回复消息时,消息直接返回到原始的发送者,不再经过router actor。

无论哪种类型,有一块是相同配置:

1
2
3
4
5
cluster {
enabled = on
allow-local-routees = off
use-roles = [slave]
}

enabled 是否启用cluster aware router
allow-local-routees 能否在本地,即router所在的节点创建和查找routee
use-roles 使用指定的角色来缩小routee的查找范围,如果routee的配置与这里的不同,则router是找不到该routee的。

Pool

我们在上面例子的基础上,把自己new的Router换成akka内置的RouterActor。改动主要有以下几个:

  • 在配置文件中指定路由相关信息
  • MasterActor中,读取路由配置,创建router及相关的routees
  1. 配置文件中actor部分增加:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    actor {
    provider = "cluster"
    deployment {
    /masterActor/poolRouter {
    router = round-robin-pool
    nr-of-instance = 5
    cluster {
    enabled = on
    allow-local-routees = on
    use-roles = [master]
    }
    }
    default {
    cluster {
    max-nr-of-instances-per-node = 5
    }
    }
    }
    }

由于我们的Router是在masterActor下创建的RouterActor,取名为poolRouter,所以其路径显然是akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor/poolRouter,masterActor启动时读取的是这个配置文件,所以deployment部分对应的就是masterActor及其子Actor,所以这里只需要填入相对路径就好了。注意,由于Routee是由masterActor创建出来的,所以use-role必须是与masterActor保持一致,否则会找不到Routee!
- router 指定预设的路由器
- nr-of-instance routee的个数

注意,有两个参数非常关键:

  • actor.deployment.default.cluster.max-nr-of-instances-per-node 它是配置Router在每个节点上部署的最大Actor数,默认是1。虽然上面我们指定了routee数目为5,但是如果只起一个节点,你会发现永远是
    1个routee在打印结果。
  • max-total-nr-of-instances 定义router所能创建的routee的总数,默认是10000。通常来说足够用了。
  1. 修改MasterActor。注释掉的部分是直接使用代码而不用配置文件手动创建Router的,有兴趣的可以自己试下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class MasterActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private ActorRef router;

    @Override
    public void preStart() throws Exception {
    router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "poolRouter");
    /*int totalInstances = 1000;
    int maxInstancePerNode = 5, routeeNumbers=5;
    boolean allowLocalRoutees = true;
    String role = "master";
    ClusterRouterPoolSettings settings = new ClusterRouterPoolSettings(totalInstances, maxInstancePerNode, allowLocalRoutees, role);
    ClusterRouterPool routerPool = new ClusterRouterPool(new RoundRobinPool(routeeNumbers), settings);
    router = getContext().actorOf(routerPool.props(Props.create(SlaveActor.class)), "poolRouter");*/
    }

    @Override
    public Receive createReceive() {
    return receiveBuilder()
    .match(String.class, msg->{
    log.info("Master got: {}", msg);
    router.tell(msg, getSender());
    })
    .build();
    }
    }
  2. 运行
    其他不变,这次只需要启动ClientMasterActorSlaveActorMasterActor中会自动创建出来。看到日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor/poolRouter/c1] Node akka://ClusterSystem/user/masterActor/poolRouter/c1#-1154482163 receives: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor/poolRouter/c2] Node akka://ClusterSystem/user/masterActor/poolRouter/c2#-50692619 receives: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor/poolRouter/c3] Node akka://ClusterSystem/user/masterActor/poolRouter/c3#1415650532 receives: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor/poolRouter/c4] Node akka://ClusterSystem/user/masterActor/poolRouter/c4#1345851811 receives: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor/poolRouter/c5] Node akka://ClusterSystem/user/masterActor/poolRouter/c5#-1384624865 receives: hello

从c1到c5轮流打印,round-robin负载均衡起作用了。

Group

这种方式下,Routee是在Router外被创建的,一般要求尽量在Router启动前启动好Routee,因为Router在启动过程中会尝试去联络Routee。使用时与Pool型的很像,区别是

  • 需要指定routees.path (remote方式下支持完整协议路径,比如akka.tcp://ClusterSystem:2551/user/testActor但是Cluster模式下不支持,只支持相对路径)
  • 不需要指定也没有nr-of-instance参数

GroupActor是根据routees.path所配置的相对路径,去当前cluster的每一个节点上用ActorSelection去查找指定role的Routee(所以use-roles中的配置一定要和slave启动时的role一致),然后直接tell消息过去。由于整个过程是异步的,就意味着GroupActor的消息发送其实根本不关心节点上对应的Routee是否包含Routee或者是否正常启动,只是简单的根据配置去转发而已。
不去检测是否包含Routee,是因为Akka是Peer-to-Peer的设计,天生就要求所有节点对等,在这个约定下,它会认为cluster中所有节点的代码相同,一定会包含Routee。
不去检测是否正常启动,这个则是由于整个通讯都是异步的。
但我个人认为这里还是使用熔断机制来加强的,使用起来会更加方便。

  1. 修改配置文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    actor {
    provider = "cluster"
    deployment {
    /masterActor/groupRouter {
    router = round-robin-group
    cluster {
    enabled = on
    allow-local-routees = on
    use-roles = [slave]
    }
    }
    }
    }

use-roles中role加不加引号都可以。

  1. 修改MasterActor中Router的名字,与配置文件中保持一致。注释掉的部分是直接使用代码而不用配置文件手动创建Router的,有兴趣的可以自己试下。

    1
    2
    3
    4
    5
    6
    @Override
    public void preStart() throws Exception {
    router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "groupRouter");
    /*List<String> routeesPaths = Arrays.asList("akka/user/slaveActor");
    router = getContext().actorOf(new RoundRobinGroup(routeesPaths).props(), "groupRouter");*/
    }
  2. 运行
    分别在几个不同窗口启动MasterActor、多个SlaveActor后,检查集群是否稳定后,即所有节点均是UP,如果启用了akka-management-cluster-http,向监控地址发送查询请求,如
    127.0.0.1:8558/cluster/members

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    {
    "selfNode": "akka.tcp://ClusterSystem@127.0.0.1:2551",
    "oldestPerRole": {
    "master": "akka.tcp://ClusterSystem@127.0.0.1:2551",
    "dc-default": "akka.tcp://ClusterSystem@127.0.0.1:2551",
    "slave": "akka.tcp://ClusterSystem@127.0.0.1:4914"
    },
    "leader": "akka.tcp://ClusterSystem@127.0.0.1:2551",
    "oldest": "akka.tcp://ClusterSystem@127.0.0.1:2551",
    "unreachable": [],
    "members": [
    {
    "node": "akka.tcp://ClusterSystem@127.0.0.1:2551",
    "nodeUid": "-1141014070",
    "status": "Up",
    "roles": [
    "master",
    "dc-default"
    ]
    },
    {
    "node": "akka.tcp://ClusterSystem@127.0.0.1:4914",
    "nodeUid": "344021242",
    "status": "Up",
    "roles": [
    "slave",
    "dc-default"
    ]
    },
    {
    "node": "akka.tcp://ClusterSystem@127.0.0.1:4936",
    "nodeUid": "678163307",
    "status": "Up",
    "roles": [
    "slave",
    "dc-default"
    ]
    },
    {
    "node": "akka.tcp://ClusterSystem@127.0.0.1:4957",
    "nodeUid": "-573369962",
    "status": "Up",
    "roles": [
    "slave",
    "dc-default"
    ]
    }
    ]
    }

然后,启动Client向masterActor发送消息,可以看到均匀的打印出接受的日志,round-robin负载均衡起作用了。再多起几个SlaveActor,会将消息转发到新的actor中去,这就是GroupPool方式好的地方,可以动态变化。

此时,你可以尝试修改下配置,将slaveActor变成和masterActor一样的role,再运行后,你会发现有消息丢失,以及转发失败的日志出来。

这是因为在上面所有的例子中,为了方便理解,都是使用一个master+若干slave的方式来演示。
然而Akka的设计是Peer-to-Peer的,即所有节点对等,那么,RouterActor就会理所应当地认为在相同role的节点上都存在Routee,由于并没有去检查Routee是否能工作,直接进行了消息转发,而按照上面的写法masterAcotr所在的节点上压根就没起过slaveActor,所以就造成了消息丢失。
将配置中allow-local-routees改为off,这时它就不会把masterActor所在节点加到负载列表中去了。但同样的,你可以去起一个空的ActorSystem,看看有什么后果。

附录:

Akka提供的路由算法:

算法说明配置算法类
RoundRobin轮询的给路由列表中每个Routee发送消息round-robin-pool 或 round-robin-groupakka.routing.RoundRobin
Random从路由列表中随机抽取一个Routee发送消息random-pool 或 random-groupakka.routing.Random
SmallestMailbox优先选取路由表中mailbox内消息数最少的Routee发送消息smallest-mailbox-poolakka.routing.SmallestMailbox
Broadcast以广播的形式将消息同时转发给所有的Routeebroadcast-pool 或 broadcast-groupakka.routing.Broadcast
ScatterGatherFirstCompleted将消息发送给所有的Routee,并等待第一个返回的结果,将该结果返回给发送者,其他结果被忽略掉scatter-gather-pool 或 scatter-gather-groupakka.routing.ScatterGatherFirstCompleted
TailChopping先随机选一个Routee发送消息,等待一个短时间的延迟后,再随机选一个Routee发送消息,等待第一个返回的结果并将该结果发送回发送者,其他结果被忽略掉tail-chopping-pool 或 tail-chopping-groupakka.routing.TailChopping
ConsistentHashing使用一致性Hash算法选取Routee转发消息consistent-hashing-pool 或 consistent-hashing-groupakka.routing.ConsistentHashing
Balancing所有的Routee共享同一个mailbox,它会将繁忙的Routee中的任务重新分配给空闲的Routee,不支持group和广播balancing-poolakka.routing.Balancing

本章代码地址:https://github.com/EdisonXu/akka-start-demo/tree/master/cluster

本文由 EdisonXu - 徐焱飞 创作,采用 CC BY 4.0 CN协议 进行许可。 可自由转载、引用,但需署名作者且注明文章出处。
本文链接为http://edisonxu.com/2018/11/14/akka-cluster-router.html
如果您觉得文章不错,可以请我喝一杯咖啡!
actor, akka, 分布式, 并发