Akka入门系列(八):akka kafka Consumer

核心API

在使用Akka kafka consumer前, 先了解下几个核心API:

  • ConsumerSetting Consumer的配置信息;
  • ConsumerRecord Kafka消息的封装类,包含消息的K、V,以及该消息所属的topic, partition, offset, timestamp等;
  • ConsumerMessageConsumerRecord的进一步充血模型,提供了自动commit以及修改offset信息的API;
  • Subscription 该Consumer的订阅信息,有AutoSubscriptionManualSubscription两个子接口,分别用于自动从Topic读取Partition以及手动绑定Partition;

Akka Kafka中,Consumer一般是作为流的Source,在akka.kafka.javadsl.Consumer中提供了常用的几种Source。主要包含两大类:

1. Offset存储及读取机制独立于Kafka以外,需自行实现commit逻辑,命名为plainxxxSource;
2. Offset存储及读取机制依赖于Kafka的所提供的API,通过调用Akka已封装的`ConsumerMessage`进行offset的commit,命名为committableXXXSource

详情可以参见最后的目录。

使用

依赖

1
2
3
4
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.12</artifactId>
</dependency>

配置

所有的Consumer都需要传入配置类ConsumerSetting,需要提供如下信息:

  • Kafka消息key和value的反序列化器
  • Kafka集群的地址信息
  • consumer的GroupId,注意:offset是按组进行commit的
  • Kafka Consumer的调优参数
1
2
3
4
5
6
7
8
9
10
11
12
public static ConsumerSettings getConsumerSettings(
Deserializer keyDeserializer,
Deserializer valDeserializer,
Config config,
String groupId){
Deserializer<String> keySerializer = new StringDeserializer();
Deserializer<byte[]> valSerializer = new ByteArrayDeserializer();

return ConsumerSettings.create(config, keyDeserializer, valDeserializer)
.withGroupId(groupId) // if not defined here, config must contains "group.id"
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

数据处理服务定义

我们用相同的一段代码,来代表整个Flow中的数据转换过程。

1
2
3
4
5
6
7
8
static class DummyBusinessLogic {
public CompletionStage<Integer> work(ConsumerRecord<String, byte[]> record){
return CompletableFuture.supplyAsync(() -> {
System.out.println("Partition["+record.partition()+"] got:"+new String(record.value()));
return record.partition();
});
}
}

Offset管理独立于Kafka以外

此类API命名规则都是plainXXXSource,对外都emit出ConsumerRecord,Offset维护在外部的存储里,可以先读取再处理或提供读取的方法给API由其调用获得最新的Offset。Commit也是手动进行,但是可以通过修改auto-commit参数(该值默认是false),由Kafka自行进行Offset的Commit。Kafka的自动Commit是阈值和周期性的Commit,哪个先触发就直接commit,比较适合量大且允许消息重复递交的场景。
我们先实现一个简单的外部存储类,用以演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// A dummy storage to store offset externally
static class ExternalOffsetStorage {
private Map<TopicPartition, Long> partitionOffsetMap = new HashMap<>();

public ExternalOffsetStorage(String topic, int partitonNum) {
for(int i=0;i<partitonNum;i++){
partitionOffsetMap.put(new TopicPartition(topic, i), new Long(0));
}
}

// User CompletionStage is to warn that read the offset may cost some time
/*public CompletionStage<Long> getLatestOffset(){
return CompletableFuture.completedFuture(offset.get());
}*/
public Long getLatestOffset(TopicPartition partition){
return partitionOffsetMap.get(partition);
}

public CompletionStage<Done> commitOffset(TopicPartition partition){
return CompletableFuture.supplyAsync(() -> {
partitionOffsetMap.put(partition, getLatestOffset(partition)+1);
return Done.done();
});
}

public CompletionStage<Done> commitOffset(int partition){
return CompletableFuture.supplyAsync(() -> {
for(TopicPartition p: partitionOffsetMap.keySet()){
if(p.partition() == partition)
partitionOffsetMap.put(p, partitionOffsetMap.get(p)+1);
}
return Done.done();
});
}

public Map<TopicPartition, Long> getPartitionOffsetMap() {
return partitionOffsetMap;
}

public CompletionStage<Map<TopicPartition, Object>> getOffsetsOnAssign(Set<TopicPartition> topicPartitions){
return CompletableFuture.supplyAsync(()->{
Map<TopicPartition, Object> result = new HashMap<>();
topicPartitions.forEach(partition -> result.put(partition, partitionOffsetMap.get(partition)));
return result;
});
}
}

不分Partition处理

不分Partition处理的API,是最简单的Consumer.plainSource,它接受两个参数:

  • ConsumerSetting 配置参数
  • Subscription Kafka的partition信息,可以是AutoSubscriptionManualSubscription
    • AutoSubscriptionSubscriptions.topics("topic")来指定
    • ManualSubscription则需要显式提供每一个TopicPartition及其对应的offset。如果只有一个partition,可以直接Subscriptions.assignmentWithOffset(new TopicPartition("topic", /*partition: */ 0), currentOffset)。如果是多个partition,则传入一个MapTopicPartition作为key,Offset的值为value。
      由于commit的时机和逻辑都是自己提供的,所以比较适合去实现exact-once-delivery
      1
      2
      3
      4
      5
      6
      7
      Consumer.plainSource(
      consumerSettings,
      Subscriptions.assignmentWithOffset(offsetStorage.getPartitionOffsetMap()))
      //Subscriptions.topics(topic))
      .mapAsync(partitionNum, record -> logic.work(record).thenApply(partition->offsetStorage.commitOffset(partition)))
      .to(Sink.ignore())
      .run(materializer);

分Partition处理

API中含Partitioned字样的,均是分Partition处理的API,即每一个Partition会对应一个新的子Source。这一类中,是Consumer.plainPartitionedSourceConsumer.plainPartitionedManualOffsetSource
与plainSource不同的点是:

  • 只接受AutoSubscription
  • 原Source并不直接emit ConsumerRecord,而是派生出三个子Source,从它获得的是一个Pair<TopicPartition, Source>封装类,包含了为每一个partition提供了一个Source对象。
  • Consumer.plainPartitionedManualOffsetSourceConsumer.plainPartitionedSource基础上,增加了一个函数参数,要求传入一个Function,根据提供的包含TopicPartition信息的Set,返回对应的每个Partition的Offset信息,封装在一个Map里,key是TopicPartition, value是offset值。

下面例子里,用flatMapMerge将原Source派生的Source合并(即Pair::second返回值)后,交由同一段Flow处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if(!manualAssignOffset)
Consumer.plainPartitionedSource(
consumerSettings,
Subscriptions.topics(topic))
// merge ConsumerRecord from different partition Source
.flatMapMerge(partitionNum, Pair::second)
// use same logic flow to handle ConsumerRecord
.mapAsync(partitionNum, record -> logic.work(record).thenApply(partition->offsetStorage.commitOffset(partition)))
.to(Sink.ignore())
.run(materializer);
else
Consumer.plainPartitionedManualOffsetSource(
consumerSettings,
Subscriptions.topics(topic),
offsetStorage::getOffsetsOnAssign)
//.mapAsync(partitionNum, logic::workWithPartitions)
.flatMapMerge(partitionNum, Pair::second)
.mapAsync(partitionNum, record -> logic.work(record).thenApply(partition->offsetStorage.commitOffset(partition)))
.to(Sink.ignore())
.run(materializer);

Offset管理依赖Kafka

Kafka的JAVA API,提供了将offset保存在zookeeper上的功能,在新版的Kafka,更是为了避免zookeeper的性能问题,在其内部创建一个叫__consumer_offsets的topic来存储offset。由offsets.storage参数定义。

基本使用

该API能够自由控制何时将offsetcommit到Kafka去。比较适合用于at-least-once递交的场景,即消息可能会被多次递交,以保证至少会有一次成功,但相应的,如果发生错误,该错误也会发生多次。

1
2
3
4
5
6
7
8
// single commit
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
// asynchronously finish logic work and fetch the offset to commit
.mapAsync(1, msg-> logic.work(msg.record()).thenApply(partition -> msg.committableOffset()))
// commit offset
.mapAsync(1, offset->offset.commitJavadsl())
.to(Sink.ignore())
.run(materializer);

这里用了mapAsync异步并行来处理消息,并行数位设为1,保证处理消息的顺序(Kafka单个partition是保序的,但是对于同一个topic的多个partition之间是无序的)。
运行一下Producer,然后可以用JMX查看该Topic的offset数在上升。

该API每处理一个消息就会commit一次,这种方式相当慢。推荐的方式是用batch批量commit,用牺牲发生错误时的重复投递来换取性能。

批量Commit

自动批处理

Akka提供了Committer.sink方法来实现自动批量Commit。在使用这个sink前,需要先在配置文件中定义或者代码里直接指定以下两个参数:

  • max-batch 每次commit的最大消息数,即超过该数即会触发一次commit
  • max-interval 两次commit之间的最大间隔

这两个参数调的越大,Kafka对于commit的load越小,消耗时间越少,但相应的,如果发生错误,重新处理的消息数肯定也是对应增加的。调的越小,则commit越频繁,会带来commit性能瓶颈。这个属于Kafka批量commit的老问题了,与Akka本身是无关的,应视不同场景进行相应参数优化。
修改application.conf,在akka.kafka.consumer区块中添加

1
2
3
4
5
6
7
akka.kafka.consumer {
...
# Maximum number of messages in a single commit batch
max-batch = 1000
# Maximum interval between commits in milliseconds
max-interval = 10000
}

批量commit代码如下:

1
2
3
4
5
6
7
// batch commit
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1, msg-> logic.work(msg.record())
.<ConsumerMessage.Committable>thenApply(partition -> msg.committableOffset())
)
.to(Committer.sink(CommitterSettings.create(config)))
.run(materializer);

PS: .<ConsumerMessage.Committable>这里是做类型强转,将msg.committableOffset返回的CommittableOffset转成其实现接口Committable

手动批处理

另一种方式,是手动的将消息用Akka StreambatchAPI聚合后做批量commit。

1
2
3
4
5
6
7
8
9
10
11
12
13
// manual batch commit
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1, msg-> logic.work(msg.record())
.thenApply(partition -> msg.committableOffset())
)
.batch(
20,
ConsumerMessage::createCommittableOffsetBatch,
ConsumerMessage.CommittableOffsetBatch::updated
)
.mapAsync(3, batch->batch.commitJavadsl())
.to(Sink.ignore())
.run(materializer);

注意:
用这种方式时,只有当下游consumer处理速度比上游的producer处理速度要慢时,batch才会触发(背压),否则会按正常commit处理。
测试时,需要把producer中的控制发送速率的.throttle()注掉,同时调高发送消息数,这样才能看到效果。

按时间聚合批处理

以上都是适合于消息速率比较高的场景,有些场景下,消息的速率非常低,可能24小时内没有任何消息抵达。此时,需考虑打开kafka的批量commit刷新参数(akka.kafka.consumer.commit-refresh-interval),否则在Kafka的存储中,offset会过期。同时,对于这种速率较低的topic,最好使用按时间进行聚合后进行批处理的groupWithinAPI。

1
2
3
4
5
6
7
8
9
10
// time-based aggregation
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1, msg-> logic.work(msg.record())
.thenApply(partition -> msg.committableOffset())
)
.groupedWithin(5, Duration.ofSeconds(60))
.map(ConsumerMessage::createCommittableOffsetBatch)
.mapAsync(3, batch->batch.commitJavadsl())
.to(Sink.ignore())
.run(materializer);

groupedWithinAPI,接受两个参数,第一个是控制多少个消息触发聚合,第二个是控制时间窗口。如果窗口内接受消息数超过第一个参数,则立刻聚合,如果未超过,到窗口时间到期时也会触发。

测试时,第一个先把Producer的最大消息数改为4,然后不要启用速率控制。看到offset并不是立刻就commit掉。然后把Producer的最大消息数改为10,每1秒发送一个,可以看到当超过5时,立刻触发commit。

分Partition处理

TODO: 流的聚合

其他

有时,我们想在offset里添加自定义的metadata,此时,可以调用Consumer.commitWithMetadataSourceAPI,用法还是比较简单,具体请参考官方文档。但需要注意的是,由于kafka可以周期性commit(akka.kafka.consumer.commit-refresh-interval参数),第一个offset可能并不会包含新的metadata信息。

每个Partition一个独立的Source

at-least-once投递

at-most-once投递

单独commit

Consumer.atMostOnceSource

批量commit

附录

Consumer API

API使用场景参数发射类
plainSource将Offset存到外部,不支持存到Kafka本身(除非开启auto-commit,用kafka自己的自动commit功能)ConsumerSettings<K,V> consumerSettings, Subscription subscriptionConsumerRecord
plainExternalSource将Offset存到外部,可以使用外部KafkaAsyncConsumer的特殊Source,一般用于预先定义好一个Consumer Actor,然后用该API去手动绑定许多topic-partitionsActorRef consumer, ManualSubscription subscriptionConsumerRecord
plainPartitionedSource将Offset存到外部,从topic自动获取partition,每个partition分别对应一个source,放到一个Pair中ConsumerSettings<K,V> consumerSettings, AutoSubscription subscriptionPair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]
plainPartitionedManualOffsetSourceplainPartitionedSource基本一致,只是允许将partition的offset存储到外部去,使用时调用传入的getOffsetsOnAssign方法去从外部读取offsetConsumerSettings<K,V> consumerSettings, AutoSubscription subscription, Function[Set[TopicPartition], CompletionStage[Map[TopicPartition, Long]] getOffsetsOnAssignPair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]
plainPartitionedManualOffsetSource多了一个onRevoke方法,用于在关闭时去处理(存储)还未commit的offset,以及做一些清扫任务ConsumerSettings<K,V> consumerSettings, AutoSubscription subscription, Function[Set[TopicPartition], CompletionStage[Map[TopicPartition, Long]] getOffsetsOnAssign, Consumer[Set[TopicPartition]] onRevokePair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]
committableSource提供API将Offset存到kafka内部,使用时自由控制何时commitConsumerSettings<K,V> consumerSettings, Subscription subscriptionCommittableMessage
committableExternalSourceplainExternalSource一样,只是提供了可以commit到Kafka内部的APIActorRef consumer, ManualSubscription subscription, String groupId, FiniteDuration commitTimeoutCommittableMessage
commitWithMetadataSource提供API将Offset存到kafka内部,并可以将额外的信息放入offset的元数据里,比如什么时间或哪个节点commit的等ConsumerSettings<K,V> consumerSettings, Subscription subscription, Function[ConsumerRecord[K, V], String] metadataFromRecordCommittableMessage
committablePartitionedSourceplainPartitionedSource一样,只是提供了可以commit到Kafka内部的APIConsumerSettings<K,V> consumerSettings, AutoSubscription subscriptionPair[TopicPartition, Source[CommittableMessage[K, V], NotUsed]
commitWithMetadataPartitionedSourceplainPartitionedSource一样,只是提供了可以commit到Kafka内部的API,同时允许添加额外信息到offset的元数据里ConsumerSettings<K,V> consumerSettings, AutoSubscription subscription, Function[ConsumerRecord[K, V], String] metadataFromRecordPair[TopicPartition, Source[CommittableMessage[K, V], NotUsed]
atMostOnceSource消息在发给下游逻辑处理前,先自动将offset更新commit掉,以保证至多一次投递ConsumerSettings<K,V> consumerSettings, Subscription subscriptionConsumerRecord
本文由 EdisonXu - 徐焱飞 创作,采用 CC BY 4.0 CN协议 进行许可。 可自由转载、引用,但需署名作者且注明文章出处。
本文链接为http://edisonxu.com/2018/12/04/akka-kafka-consumer.html
如果您觉得文章不错,可以请我喝一杯咖啡!
actor, akka, kafka, 分布式, 并发