Kafka学习笔记 - 2 - 核心API - 3 - Consumer API

Posted by WZhong on Thursday, September 1, 2022

TOC

目标

  • 了解:Consumer 高级特性
  • 熟练掌握:Consumer API
  • 熟练掌握:Consumer 各项重点配置

Demo

  • 自动提交 offset

      private static void consumerPoll(){
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.220.128:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
        KafkaConsumer<String,String> consumer = new KafkaConsumer(props);
        // 消费订阅哪一个Topic或者几个Topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
          for (ConsumerRecord<String, String> record : records)
            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
        }
      }
    
  • 手动提交 offset

      ...
      props.setProperty("enable.auto.commit", "false");
      ...
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<String, String> record : records) {
            // 想把数据保存到数据库,成功就成功,不成功...
            // TODO record 2 db
            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            // 如果失败,则回滚, 不要提交offset
        }
        // 如果成功,手动通知offset提交
        consumer.commitAsync();
      }
    
  • 手动提交 offset,并且手动控制 partition(每个partition单独处理)

      // 每个partition单独处理
      for(TopicPartition partition : records.partitions()){
          List<ConsumerRecord<String, String>> pRecord = records.records(partition);
          for (ConsumerRecord<String, String> record : pRecord) {
              System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
          }
          long lastOffset = pRecord.get(pRecord.size() -1).offset();
          // 单个partition中的offset,并且进行提交
          Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
          offset.put(partition, new OffsetAndMetadata(lastOffset+1));
          // 提交offset
          consumer.commitSync(offset);
          System.out.println("=============partition - "+ partition +" end================");
      }
    
  • 手动提交 offset,并且手动控制 partition(消费订阅某个Topic的某个分区)

      // jiangzh-topic - 0,1两个partition
      TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
      TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
    
      // 消费订阅某个Topic的某个分区
      consumer.assign(Arrays.asList(p0));
    
  • 多线程并发处理

    • 经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全(ConsumerThreadSample.java)

    • 一个 consumer 多个事件处理器(ConsumerRecordThreadSample.java)

  • 手动指定offset的起始位置,及手动提交offset

      // 手动指定offset起始位置
      /*
        1、人为控制offset起始位置
        2、如果出现程序错误,重复消费一次
      */
      /*
        1、第一次从0消费【一般情况】
        2、比如一次消费了100条, offset置为101并且存入Redis
        3、每次poll之前,从redis中获取最新的offset位置
        4、每次从这个位置开始消费
        */
      consumer.seek(p0, 700);
    
  • 限流

      /*
        1、接收到record信息以后,去令牌桶中拿取令牌
        2、如果获取到令牌,则继续业务处理
        3、如果获取不到令牌, 则pause等待令牌
        4、当令牌桶中的令牌足够, 则将consumer置为resume状态
      */
      consumer.pause(Arrays.asList(p0));
      consumer.resume(Arrays.asList(p0));
    
  • rebalance解析

    • 新成员加入组(member join)

    • 组成员崩溃(member failure)

    • 提交位移(member commit offset)

Consumer 基本概念

  • Consumer Group (props.setProperty(“group.id”, “test”);)
    • 多个 consumer 同一个事件处理器 alt

      • 单个分区的消息只能由 ConsumerGroup 中某个 Consumer 消费
      • Consumer 从 Partition 中消费消息是顺序,默认从头开始消费
      • 单个 ConsumerGroup 会消费所有 Partition 中的消息
    • 一个 consumer 多个事件处理器 alt

      • consumer 过多,对象过重

「真诚赞赏,手留余香」

WZhong

真诚赞赏,手留余香

使用微信扫描二维码完成支付