TOC
基本介绍
- 了解:Producer各项重点配置
- 熟练掌握:Producer API
- 熟练掌握:Producer负载均衡等高级特性
Producer各项重点配置
Producer API
-
Producer发送模式
-
异步发送
public static void producerSend() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092"); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.RETRIES_CONFIG,"0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG,"1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); // Producer的主对象 Producer<String,String> producer = new KafkaProducer<>(properties); // 消息对象 - ProducerRecoder for(int i=0;i<10;i++){ ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i); producer.send(record); } // 所有的通道打开都需要关闭 producer.close(); }
-
同步发送(异步阻塞发送)
public static void producerSyncSend() { //... Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.println("key-"+i + "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset()); //... }
-
异步发送带回调
public static void producerSendWithCallback() { //... producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println( "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset()); } }); //... }
-
-
源码解析
-
new KafkaProducer<>(properties);
- MetricConfig
- 加载负载均衡器
- 初始化 Serializer
- 初始化 RecordAccumulator (计数器)
- 启动 newSender(守护线程)
-
源码通知
- Producer 线程安全
- Producer 批量发送(减少IO;日志追加)
-
producer.send(record)
- 计算分区(消息进入哪一个partition)
- 计算批次(accumulator.append)
- 主要内容
- 创建批次
- 向批次中追加消息
-
时序图
-
-
Producer发送原理解析
- 直接发送
- 负载均衡
- 异步发送
- 流程图
自定义 Partition 负载均衡
-
DemoPartition.java 实现 Partitioner 接口,重写 partition() 方法
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { /* key-1 key-2 key-3 */ String keyStr = key + ""; String keyInt = keyStr.substring(4); System.out.println("keyStr : "+keyStr + "keyInt : "+keyInt); int i = Integer.parseInt(keyInt); return i%2; }
-
添加 partition 配置
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xxx.producer.SamplePartition");
消息传递保障
- 三种传递保障,依赖于Producer和Consumer共同实现,主要依赖于Producer
- 最多一次:收到 0~1次(发出即可)
- 至少一次:收到 1~多次
- 正好一次:有且仅有 1次(acks=all)
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付
