TOC
基本介绍
- 了解:Kafka客户端API类型及区别
- 熟练掌握:构建Kafka之Java客户端
- 熟练掌握:Kafka客户端的基本操作
Kafka客户端API类型
- Admin API: 允许管理和检测Topic、broker以及其他Kafka对象
- Producer API: 发布消息到一个或多个Topic
- Consumer API: 订阅一个或多个Topic,并处理产生的消息
- Streams API: 高效地将输入流转换到输出流
- Connect API: 从一些源系统或应用程序中拉取数据到Kafka
构建Kafka之Java客户端
-
创建SpringBoot工程
- 添加 spring-boot-starter-web 依赖
- 添加 kafka-clients 依赖
- 创建 admin、consumer、producer 包
-
AdminClient客户端建立
-
AdminClient API
AdminClient AdminClient客户端对象 NewTopic 创建Topic CreateTopicsResult 创建Topic的返回结果 ListTopicsResult 查询Topic列表 ListTopicsOptions 查询Topic选项 DescribeTopicsResult 查询Topics DescribeConfigsResult 查询配置项 -
创建AdminClient(Kafka配置全解析.pdf)
public static AdminClient createAdminClient(){ Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092"); AdminClient adminClient = AdminClient.create(properties); return adminClient; }
-
Kafka客户端的基本操作
-
创建Topic(createTopics)
public static void createTopic() { AdminClient adminClient = adminClient(); // 副本因子 Short rs = 1; NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs); CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic)); System.out.println("CreateTopicsResult : "+ topics); }
-
查询Topic列表(listTopics)
public static void listTopics() throws Exception { AdminClient adminClient = adminClient(); ListTopicsResult listTopicsResult = adminClient.listTopics(); ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopicsResult = adminClient.listTopics(options); Set<String> names = listTopicsResult.names().get(); Collection<TopicListing> topicListings = listTopicsResult.listings().get(); KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings(); names.stream().forEach(System.out::println); topicListings.stream().forEach((topicList)->{ System.out.println(topicList); }); }
-
删除Topic(deleteTopics)
public static void delTopics() throws Exception { AdminClient adminClient = adminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME)); deleteTopicsResult.all().get(); }
-
查询描述Topic信息(describeTopics)
public static void describeTopics() throws Exception { AdminClient adminClient = adminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach((entry)->{ System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()); }); }
-
查询描述配置信息(describeConfigs)
public static void describeConfig() throws Exception{ AdminClient adminClient = adminClient(); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME); DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource)); Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get(); configResourceConfigMap.entrySet().stream().forEach((entry)->{ System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue()); }); }
-
修改Topic配置信息(incrementalAlterConfigs)
public static void alterConfig() throws Exception{ AdminClient adminClient = adminClient(); Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>(); // 组织两个参数 ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME); AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET); configMaps.put(configResource,Arrays.asList(alterConfigOp)); AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps); alterConfigsResult.all().get(); }
-
增加Partition(createPartitions)
public static void incrPartitions(int partitions) throws Exception{ AdminClient adminClient = adminClient(); Map<String, NewPartitions> partitionsMap = new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(partitions); partitionsMap.put(TOPIC_NAME, newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap); createPartitionsResult.all().get(); }
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付
