Kafka学习笔记 - 2 - 核心API - 1 - Client

Posted by WZhong on Monday, August 29, 2022

TOC

基本介绍

  • 了解:Kafka客户端API类型及区别
  • 熟练掌握:构建Kafka之Java客户端
  • 熟练掌握:Kafka客户端的基本操作

Kafka客户端API类型

  • Admin API: 允许管理和检测Topic、broker以及其他Kafka对象
  • Producer API: 发布消息到一个或多个Topic
  • Consumer API: 订阅一个或多个Topic,并处理产生的消息
  • Streams API: 高效地将输入流转换到输出流
  • Connect API: 从一些源系统或应用程序中拉取数据到Kafka

构建Kafka之Java客户端

  • 创建SpringBoot工程

    • 添加 spring-boot-starter-web 依赖
    • 添加 kafka-clients 依赖
    • 创建 admin、consumer、producer 包
  • AdminClient客户端建立

    • AdminClient API

      AdminClient AdminClient客户端对象
      NewTopic 创建Topic
      CreateTopicsResult 创建Topic的返回结果
      ListTopicsResult 查询Topic列表
      ListTopicsOptions 查询Topic选项
      DescribeTopicsResult 查询Topics
      DescribeConfigsResult 查询配置项
    • 创建AdminClient(Kafka配置全解析.pdf)

        public static AdminClient createAdminClient(){
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092");
      
            AdminClient adminClient = AdminClient.create(properties);
            return adminClient;
        }
      

Kafka客户端的基本操作

  • 创建Topic(createTopics)

      public static void createTopic() {
          AdminClient adminClient = adminClient();
          // 副本因子
          Short rs = 1;
          NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
          CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
          System.out.println("CreateTopicsResult : "+ topics);
      }
    
  • 查询Topic列表(listTopics)

      public static void listTopics() throws Exception {
          AdminClient adminClient = adminClient();
            
          ListTopicsResult listTopicsResult = adminClient.listTopics();
    
          ListTopicsOptions options = new ListTopicsOptions();
          options.listInternal(true);
          ListTopicsResult listTopicsResult = adminClient.listTopics(options);
    
          Set<String> names = listTopicsResult.names().get();
          Collection<TopicListing> topicListings = listTopicsResult.listings().get();
          KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
    
          names.stream().forEach(System.out::println);
          topicListings.stream().forEach((topicList)->{
              System.out.println(topicList);
          });
      }
    
  • 删除Topic(deleteTopics)

      public static void delTopics() throws Exception {
          AdminClient adminClient = adminClient();
          DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
          deleteTopicsResult.all().get();
      }
    
  • 查询描述Topic信息(describeTopics)

      public static void describeTopics() throws Exception {
          AdminClient adminClient = adminClient();
          DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
          Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
          Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
          entries.stream().forEach((entry)->{
              System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
          });
      }
    
  • 查询描述配置信息(describeConfigs)

      public static void describeConfig() throws Exception{
          AdminClient adminClient = adminClient();
    
          ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
          DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
          Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
          configResourceConfigMap.entrySet().stream().forEach((entry)->{
              System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
          });
      }
    
  • 修改Topic配置信息(incrementalAlterConfigs)

      public static void alterConfig() throws Exception{
          AdminClient adminClient = adminClient();
    
          Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
          // 组织两个参数
          ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
          AlterConfigOp alterConfigOp =
                  new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
          configMaps.put(configResource,Arrays.asList(alterConfigOp));
    
          AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
          alterConfigsResult.all().get();
      }
    
  • 增加Partition(createPartitions)

      public static void incrPartitions(int partitions) throws Exception{
          AdminClient adminClient = adminClient();
          Map<String, NewPartitions> partitionsMap = new HashMap<>();
          NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
          partitionsMap.put(TOPIC_NAME, newPartitions);
    
          CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
          createPartitionsResult.all().get();
      }
    

「真诚赞赏,手留余香」

WZhong

真诚赞赏,手留余香

使用微信扫描二维码完成支付