Kafka学习笔记 - 2 - 核心API - 4 - Streams API

Posted by WZhong on Thursday, September 1, 2022

TOC

目录

  • 介绍
  • 概念 及 初识高层架构图
  • 核心概念
  • 演示准备
  • 使用
  • 程序解析
  • 算子演示

目标

  • 了解:Kafka 的流处理流程
  • 掌握:Kafka Stream 高层架构
  • 掌握:Kafka Stream 开发

基本概念

  • Kafka Stream 是处理分析存储在 Kafka 数据的客户端程序库
  • Kafka Stream 通过 state store 可以实现高效状态操作
  • 支持原语 Processor 和高层抽象 DSL
  • 关键词:
    • 流 及 流处理器
    • 流处理拓扑
    • 源处理器 及 Sink 处理器

编码

  • 准备

    • 添加 kafka-streams 依赖
    • 创建 input_topic 和 output_topic
  • demo

      // 如何定义流计算过程
      static void wordcountStream(final StreamsBuilder builder){
        // 不断从INPUT_TOPIC上获取新数据,并且追加到流上的一个抽象对象
        KStream<String,String> source = builder.stream(INPUT_TOPIC);
        // Hello World imooc
        // KTable是数据集合的抽象对象
        // 算子
        final KTable<String, Long> count =
              // flatMapValues -> 将一行数据拆分为多行数据  key 1 , value Hello World
              // flatMapValues -> 将一行数据拆分为多行数据  key 1 , value Hello key xx , value World
              /*
                key 1 , value Hello   -> Hello 1  World 2
                key 2 , value World
                key 3 , value World
              */
          source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                .groupBy((key, value) -> value) // 合并 -> 按value值合并
                .count(); // 统计出现的总数
    
        // 将结果输入到OUT_TOPIC中
        count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(),Serdes.Long()));
      }
    
      static void foreachStream(final StreamsBuilder builder){
        KStream<String,String> source = builder.stream(INPUT_TOPIC);
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
              .foreach((key,value)-> System.out.println(key + " : " + value));
      }
    

「真诚赞赏,手留余香」

WZhong

真诚赞赏,手留余香

使用微信扫描二维码完成支付