Kafka学习笔记 - 2 - 核心API - 2 - Producer API

Posted by WZhong on Tuesday, August 30, 2022

TOC

基本介绍

  • 了解:Producer各项重点配置
  • 熟练掌握:Producer API
  • 熟练掌握:Producer负载均衡等高级特性

Producer各项重点配置

Producer API

  • Producer发送模式

    • 异步发送

        public static void producerSend() {
          Properties properties = new Properties();
          properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092");
          properties.put(ProducerConfig.ACKS_CONFIG,"all");
          properties.put(ProducerConfig.RETRIES_CONFIG,"0");
          properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
          properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
          properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
      
          // Producer的主对象
          Producer<String,String> producer = new KafkaProducer<>(properties);
      
          // 消息对象 - ProducerRecoder
          for(int i=0;i<10;i++){
            ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
            producer.send(record);
          }
          // 所有的通道打开都需要关闭
          producer.close();
        }
      
    • 同步发送(异步阻塞发送)

        public static void producerSyncSend() {
            //...
            Future<RecordMetadata> send = producer.send(record);
            RecordMetadata recordMetadata = send.get();
            System.out.println("key-"+i + "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
            //...
        }
      
    • 异步发送带回调

        public static void producerSendWithCallback() {
            //...
            producer.send(record, new Callback() {
              @Override
              public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                  System.out.println(
                          "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
              }
            });
            //...
        }
      
  • 源码解析

    • new KafkaProducer<>(properties);

      • MetricConfig
      • 加载负载均衡器
      • 初始化 Serializer
      • 初始化 RecordAccumulator (计数器)
      • 启动 newSender(守护线程)
    • 源码通知

      • Producer 线程安全
      • Producer 批量发送(减少IO;日志追加)
    • producer.send(record)

      • 计算分区(消息进入哪一个partition)
      • 计算批次(accumulator.append)
      • 主要内容
        • 创建批次
        • 向批次中追加消息
    • 时序图 alt

  • Producer发送原理解析

    • 直接发送
    • 负载均衡
    • 异步发送
    • 流程图 alt

自定义 Partition 负载均衡

  • DemoPartition.java 实现 Partitioner 接口,重写 partition() 方法

      @Override
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /*
            key-1
            key-2
            key-3
          */
        String keyStr = key + "";
        String keyInt = keyStr.substring(4);
        System.out.println("keyStr : "+keyStr + "keyInt : "+keyInt);
    
        int i = Integer.parseInt(keyInt);
    
        return i%2;
      }
    
  • 添加 partition 配置

      properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xxx.producer.SamplePartition");
    

消息传递保障

  • 三种传递保障,依赖于Producer和Consumer共同实现,主要依赖于Producer
    • 最多一次:收到 0~1次(发出即可)
    • 至少一次:收到 1~多次
    • 正好一次:有且仅有 1次(acks=all)

「真诚赞赏,手留余香」

WZhong

真诚赞赏,手留余香

使用微信扫描二维码完成支付