TOC
目标
- 了解:Consumer 高级特性
- 熟练掌握:Consumer API
- 熟练掌握:Consumer 各项重点配置
Demo
-
自动提交 offset
private static void consumerPoll(){ Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.220.128:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String,String> consumer = new KafkaConsumer(props); // 消费订阅哪一个Topic或者几个Topic consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<String, String> record : records) System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } }
-
手动提交 offset
... props.setProperty("enable.auto.commit", "false"); ... while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<String, String> record : records) { // 想把数据保存到数据库,成功就成功,不成功... // TODO record 2 db System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); // 如果失败,则回滚, 不要提交offset } // 如果成功,手动通知offset提交 consumer.commitAsync(); }
-
手动提交 offset,并且手动控制 partition(每个partition单独处理)
// 每个partition单独处理 for(TopicPartition partition : records.partitions()){ List<ConsumerRecord<String, String>> pRecord = records.records(partition); for (ConsumerRecord<String, String> record : pRecord) { System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } long lastOffset = pRecord.get(pRecord.size() -1).offset(); // 单个partition中的offset,并且进行提交 Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>(); offset.put(partition, new OffsetAndMetadata(lastOffset+1)); // 提交offset consumer.commitSync(offset); System.out.println("=============partition - "+ partition +" end================"); }
-
手动提交 offset,并且手动控制 partition(消费订阅某个Topic的某个分区)
// jiangzh-topic - 0,1两个partition TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0); TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1); // 消费订阅某个Topic的某个分区 consumer.assign(Arrays.asList(p0));
-
多线程并发处理
-
经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全(ConsumerThreadSample.java)
-
一个 consumer 多个事件处理器(ConsumerRecordThreadSample.java)
-
-
手动指定offset的起始位置,及手动提交offset
// 手动指定offset起始位置 /* 1、人为控制offset起始位置 2、如果出现程序错误,重复消费一次 */ /* 1、第一次从0消费【一般情况】 2、比如一次消费了100条, offset置为101并且存入Redis 3、每次poll之前,从redis中获取最新的offset位置 4、每次从这个位置开始消费 */ consumer.seek(p0, 700);
-
限流
/* 1、接收到record信息以后,去令牌桶中拿取令牌 2、如果获取到令牌,则继续业务处理 3、如果获取不到令牌, 则pause等待令牌 4、当令牌桶中的令牌足够, 则将consumer置为resume状态 */ consumer.pause(Arrays.asList(p0)); consumer.resume(Arrays.asList(p0));
-
rebalance解析
-
新成员加入组(member join)
-
组成员崩溃(member failure)
-
提交位移(member commit offset)
-
Consumer 基本概念
- Consumer Group (props.setProperty(“group.id”, “test”);)
-
多个 consumer 同一个事件处理器
- 单个分区的消息只能由 ConsumerGroup 中某个 Consumer 消费
- Consumer 从 Partition 中消费消息是顺序,默认从头开始消费
- 单个 ConsumerGroup 会消费所有 Partition 中的消息
-
一个 consumer 多个事件处理器
- consumer 过多,对象过重
-
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付
