最近在做Flink Streaming读取Kafka的数据,包括使用Flink Jar和Flink SQL去读取Kafka中的数据,熟悉碰到的第一个较为迷惑的地方就是offset的问题。
因为需要演示,确保业务的流程完整,所以在这样的背景下要求Flink程序每次启动的时候都是从Kafka最开始进行读取,在Kafka中可以通过auto.offset.reset
进行offset的rese。
一共有三种模式earliest
,latest
和none
,我设置的是earliest
,按照我的理解,既然是earliest,那么每次启动的时候都应该是从头开始读取,但是实际却是只有第一次运行jar的时候会消费到数据。
后续再启动如果kafka没有新数据流入,那么便不会再有消费行为,这个跟我对earliest的理解不一致,于是我又去看了下官方的描述:
earliest: automatically reset the offset to the earliest offset
|
这个描述就比较宽泛了,不过也算能表达出意思,那就是earliest是把offset给重置到earliest offset而并不是头,所以这三种模式实际的意思是:
- earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none :topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
这里所有的offset都是基于某个group id的,kafka是基于了某个group id,如果每次创建全新的group id,那肯定没问题。
不过我又看到另外一个参数scan.startup.mode
,我发现在使用flink sql的时候可以通过scan.startup.mode
=earliest-offset
来设置offset,可以有如下几种设置方式:
- group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
- earliest-offset: start from the earliest offset possible.
- latest-offset: start from the latest offset.
- timestamp: start from user-supplied timestamp for each partition.
- specific-offsets: start from user-supplied specific offsets for each partition.
默认是group-offsets
,这是在Flink SQL模式下的用法,效果与代码类似。
于是要达到我的目的,只有2种方式:每次切换group id,确保每次运行的group id是全新的,这样自然每次提交程序都是从头开始,这种方式简单粗暴,还有一种方式我想的就是既然某个group id已经有了offset,那么我能不能把这个offset删掉。
这里引申出的问题就是kafka对于某个group id的offset是怎么存储和处理的,在kafka0.8.1.1以前,offset保存在zk中,存放在/consumers节点下,由于频繁访问zk,zk需要一个一个节点更新offset,不能批量或分组更新,导致offset更新成了瓶颈。后续两个过渡版本增加了参数“offsets.storage”,该参数可配置为“zookeeper”或“kafka”分别表示offset的保持位置在zk或是broker,默认保留在zk。
在ZK中保存的结构为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
|
从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上。
Kafka版本0.10.1.1,已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。
其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。
broker消息保存目录在配置文件server.properties中:
# A comma separated list of directories under which to store log files log.dirs=/usr/local/var/lib/kafka-logs
|
该目录下默认包含50个以__consumer_offsets开头的目录,用于存放offset:
__consumer_offsets-0 __consumer_offsets-22 __consumer_offsets-36 __consumer_offsets-5 __consumer_offsets-1 __consumer_offsets-23 __consumer_offsets-37 __consumer_offsets-6
|
offset的存放位置决定于groupid的hash值,其获取方式:
Utils.abs(groupId.hashCode) % numPartitions
|
其中numPartitions由offsets.topic.num.partitions参数决定,默认值即50,以groupid “test-group”为例,计数其存储位置为:__consumer_offsets-12,当其消费全部10条数据后,使用命令查看该目录下消息记录:
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --partition 12 --from-beginning --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
|
输出:
[test-group,test1,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[0], metadata=, commitTimestamp=1605601180391, expireTimestamp=None) [test-group,test1,2]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1605601180391, expireTimestamp=None) [test-group,test1,1]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1605601180391, expireTimestamp=None)
|
该数据结构为以groupid-topic-partition作为key,value为OffsetAndMetadata,其中包含了offset信息。可以看到group“test-group”在topic“test1”的三个partition下offset值分别为6,2,2。同保存在zk数据一样,offset只记录groupid的消费情况,对于具体consumer是透明的。
那么offset具体被发送给哪个broker保存呢?offset的存储分区是通过groupid的hash值取得的,那么offset发送的broker就是该分区的leader broker,这也符合kafka普通消息的发生逻辑。
所以,每个group的offset都将发生到一个broker,broker中存在一个offset manager 实例负责接收处理offset提交请求,并返回提交操作结果。
自定义Kafka的Offset
在Kafka中,offset默认存储在broker的Topic,我们也可以自定义存储位置,为了保证消费和提交偏移量同时成功或失败,我们可以利用数据库事务来实现,下面是把offset存储在Mysql里的一个例子。
在重新均衡分组之前保存数,在重新均衡后读取数据。
在提交偏移量时保存数据。
注意 在重置偏移量时候,要比提交的大1,因此读取时候对偏移量值加1。
读取条件 分组+Topic+Partition。
建表语句:
CREATE TABLE `consumer_offset` ( `id` int(11) NOT NULL AUTO_INCREMENT, `consumer_group` varchar(255) COLLATE utf8_bin DEFAULT NULL, `consumer_topic` varchar(255) COLLATE utf8_bin DEFAULT NULL, `consumer_partition` int(255) DEFAULT NULL, `consumer_offset` bigint(255) DEFAULT NULL, `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
|
一个Spring集成Kafka使用定义的存储的例子:
import cn.spring.tech.dao.ConsumerOffsetDao; import cn.spring.tech.model.ConsumerOffsetEntity; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.*;
@Slf4j @RestController @RequestMapping("/kafka") public class KafkaController {
@Autowired private ConsumerOffsetDao consumerOffsetDao;
@PostMapping("/s/{msg}") public String send(@PathVariable("msg") String msg){ String topic="t3";
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.211.134:9091"); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.RETRIES_CONFIG,1); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); properties.put(ProducerConfig.LINGER_MS_CONFIG,1); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<Object, Object> producer = new KafkaProducer<>(properties); producer.send(new ProducerRecord(topic,"key-"+msg.hashCode(),"val-"+msg)); producer.close(); log.info("oks"); return "oks"; }
@PostMapping("/c") public String consumer(){ String group="f6"; String topic="t3";
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.134:9091"); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { for(TopicPartition topicPartition:collection){ int partition = topicPartition.partition(); long offset = consumer.position(topicPartition); commitOffset(group,topic,partition,offset); } }
@Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { for(TopicPartition topicPartition:collection){ int partition = topicPartition.partition(); long offset = getOffset(group, topic, partition); consumer.seek(topicPartition,offset); } } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); List<ConsumerOffsetEntity> list=new ArrayList<>(); for(ConsumerRecord<String,String> consumerRecord:records){ String topics=consumerRecord.topic(); int p=consumerRecord.partition(); log.info("{}消费到消息:partition="+topics+p+",key="+consumerRecord.key()+",val="+consumerRecord.value()+",offset="+consumerRecord.offset(),getThread()); ConsumerOffsetEntity offsetEntity = new ConsumerOffsetEntity(); offsetEntity.setConsumerGroup(group); offsetEntity.setConsumerTopic(consumerRecord.topic()); offsetEntity.setConsumerPartition(consumerRecord.partition()+""); offsetEntity.setConsumerOffset(consumerRecord.offset()+""); offsetEntity.setCreateTime(new Date()); list.add(offsetEntity); } if(!CollectionUtils.isEmpty(list)){ ConsumerOffsetEntity consumerOffsetEntity = list.get(list.size() - 1); consumerOffsetDao.insert(consumerOffsetEntity); log.info("{}===偏移量提交成功==="+consumerOffsetEntity,getThread()); }
} }
private long getOffset(String group, String topic, int partition) { QueryWrapper<ConsumerOffsetEntity> wrapper = new QueryWrapper<>(); wrapper.eq("consumer_group",group); wrapper.eq("consumer_topic",topic); wrapper.eq("consumer_partition",partition); wrapper.orderByDesc("create_time"); List<ConsumerOffsetEntity> list = consumerOffsetDao.selectList(wrapper); if(CollectionUtils.isEmpty(list)){ log.info("{}>>>>>>>>>重新均衡分组<<<<<<<<<读取偏移量"+0,getThread()); return 0; } ConsumerOffsetEntity consumerOffsetEntity = list.stream().max((p1, p2) -> Integer.valueOf(p1.getConsumerOffset()) - Integer.valueOf(p2.getConsumerOffset())).get(); String offset = consumerOffsetEntity.getConsumerOffset(); log.info("{}>>>>>>>>>重新均衡分组<<<<<<<<<读取偏移量"+consumerOffsetEntity,getThread()); return Long.valueOf(offset)+1; }
private void commitOffset(String group, String topic, int partition, long offset) { ConsumerOffsetEntity consumerOffsetEntity = new ConsumerOffsetEntity(); consumerOffsetEntity.setConsumerGroup(group); consumerOffsetEntity.setConsumerTopic(topic); consumerOffsetEntity.setConsumerPartition(String.valueOf(partition)); consumerOffsetEntity.setConsumerOffset(String.valueOf(offset)); consumerOffsetEntity.setCreateTime(new Date()); consumerOffsetDao.insert(consumerOffsetEntity); log.info("{}>>>>>>>>>重新均衡分组>>>>>>>>>提交偏移量",getThread());
}
private String getThread(){ return Thread.currentThread().getName(); } }
|
扫码手机观看或分享: