kafka Stream

kafka Streams

image-20241011132134511

Kafka Streams:作为 Kafka 的内置流处理库,专注于处理 Kafka 中的数据流,适合需要流式处理 Kafka 消息的应用,具有简单的 API 和高效的处理能力。

1.了解kafka Streams

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

image-20241011132426558

按照时间轴堆积处理

2.Kafka Streams的关键概念
  • 源处理器(Source Processor)源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。-交换机
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。-队列-与此kafka主题为消费者

image-20241011132529993

3.KStream数据结构

结构类似于map,如下图,key-value键值对

image-20241011132829446

4.KStream

image-20241011132931855

KStream 是 Kafka Streams API 中用于处理数据流的一个核心概念,代表着一个顺序且不断更新的数据集.

数据流中常记录事件,每个事件都类似于向日志中插入的新数据,而不是对之前数据的更新。

因此,每一条数据都被看作是对前一条数据的累加或增量更新。

image-20241011133220139

每一条新数据都是对流中已有数据的增量,而不是对某个值的替换或更新。即使新数据和已有数据的 key 相同(如上例中的 “alice”),它们也不会覆盖,而是作为新数据插入流中。->我们可以汇总行为-算出结果集

原生代码

需求分析,求单词个数(word count)

image-20241011133318225

依赖导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
  1. 生产者发送消息

    • Kafka 生产者应用程序负责发送消息到 Kafka 集群中的主题(Topic)。在你的示例中,生产者发送消息到 itcast-topic-input 主题。
  2. Kafka Streams 应用程序进行流处理

    • Kafka Streams 应用程序使用 StreamsBuilder 来构建流处理拓扑。
    • 在你的代码中,streamProcessor 方法定义了流处理逻辑,它从 itcast-topic-input 主题接收消息,进行处理(例如分割、聚合等),并将处理结果发送到另一个主题 itcast-topic-out
  3. 消费者获取消息

    • Kafka 消费者应用程序连接到 Kafka 集群,并从特定主题中拉取(consume)消息。
    • 在你的示例中,消费者应该订阅 itcast-topic-out 主题来获取 Kafka Streams 应用程序处理后的消息。

    image-20241011144449509

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

public class kafka_test {

public static void main(String[] args) {
// 配置 Apache Kafka Streams 应用程序的属性
Properties prop = new Properties();
///class Properties extends Hashtable<Object,Object>
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
//stream 构建器
StreamsBuilder streamsBuilder = new StreamsBuilder();

// 定义流处理逻辑
streamProcessor(streamsBuilder);


// 创建 Kafka Streams 实例
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
//开启流式计算
kafkaStreams.start();

}
/**
* 流式计算
* 消息的内容:hello kafka hello itcast
* @param streamsBuilder
*/
private static void streamProcessor(StreamsBuilder streamsBuilder) {
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
/**
* 处理消息的value
*/
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//按照value进行聚合处理
.groupBy((key,value)->value)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//统计单词的个数
.count()
//转换为kStream
.toStream()
.map((key,value)->{
System.out.println("key:"+key+",vlaue:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 生产者
*/
public class ProducerDemo {
public static void main(String[] args) {
//1.kafka的配置信息
Properties pro = new Properties();
//Kafka的连接地址
pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
//发送失败,失败重连次数
pro.put(ProducerConfig.RETRIES_CONFIG,5);
//消息key的序列化器
pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//数据压缩
pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//2.生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(pro);

String[] messages = {
"hello kafka hello itcast",
"welcome to kafka streams",
"kafka is a distributed streaming platform"
};
for (String message : messages) {
ProducerRecord<String, String> record = new ProducerRecord<>("itcast-topic-input", message);
producer.send(record);
System.out.println("Sent: " + message);
}


//5.关闭消息通道(必选)
producer.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 消费者
*/
public class ConsumerDemo {
public static void main(String[] args) {
//1.添加Kafka配置信息
Properties pro = new Properties();
//Kafka的连接地址
pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
//消费者组
pro.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");
//消息key的反序列化器
pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消息value的反序列化器
pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//手动提交偏移量
pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//2.消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(pro);
//3.订阅主题
consumer.subscribe(Collections.singletonList("tbug-topic-out"));
//4.设置线程一种处于监听状态
//同步提交和异步提交偏移量
try {
while(true) {
//5.获取消息
ConsumerRecords<String, String> messages = consumer.poll(Duration.ofMillis(1000)); //设置每秒钟拉取一次
for (ConsumerRecord<String, String> message : messages) {
System.out.print(message.key() + ":");
System.out.println(message.value());
}
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("记录错误的信息:"+e);
} finally {
//同步
consumer.commitSync();
}
}
}
spring 集结
  • JDK 8或更高版本
  • Apache Kafka
  • Spring Boot
  • Maven

1.导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</dependencies>

2.在application.properties文件中添加Kafka相关的配置:

1
2
3
4
5
6
7
8
9
10
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest///指定了消费者在读取特定主题时,如果未找到有效的 offset(比如在消费者开始读取之前主题刚刚被创建),消费者应该从哪个 offset 开始读取数据。
earliest:消费者将从每个分区的最开始处开始读取消息。
latest:消费者将从每个分区的最新记录开始读取消息。
none:消费者将拒绝读取消息,如果它找不到有效的 offset


spring.kafka.consumer.group-id=my-group
Kafka 使用消费者组来管理一组消费者,它们共同消费一个主题的数据。每个消费者组内的消费者实例会协调彼此,以平衡负载并确保每个消息只被组内的一个消费者处理。
group-id 是用来标识一个消费者组的唯一标识符。同一个消费者组中的所有消费者将共享相同的 group-id。

3.创建Kafka Streams处理器

4.我们需要创建一个Kafka Streams处理器来定义我们的数据处理逻辑。可以创建一个新的类,实现Spring的KafkaStreamsDSL接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@EnableKafkaStreams
public class KafkaStreamsProcessor implements KafkaStreamsDSL {

private static final String INPUT_TOPIC = "my-input-topic";
private static final String OUTPUT_TOPIC = "my-output-topic";

@Override
public void buildStreams(StreamsBuilder builder) {
//本身作为输入流
KStream<String, String> inputTopic = builder.stream(INPUT_TOPIC);

// 在这里添加数据处理逻辑
KStream<String, String> outputTopic = inputTopic
.mapValues(value -> value.toUpperCase())
.filter((key, value) -> value.length() > 5);

outputTopic.to(OUTPUT_TOPIC);//充当数据数据的作用-发送给output
}
}

在上面的代码中,我们创建了一个输入主题my-input-topic和一个输出主题my-output-topic。然后,我们使用mapValues方法将输入流中的值转换为大写,并使用filter方法过滤长度大于5的记录。最后,我们使用to方法将输出流写入输出主题。

5.我们可以在Spring Boot应用程序的主类中启动Kafka Streams处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
//启用处理器

KafkaStreamsProcessor kafkaStreamsProcessor =
new KafkaStreamsProcessor();

kafkaStreamsProcessor.start();
}
}

6.生产和消费消息

我们可以使用Kafka生产者向输入主题发送消息,并使用Kafka消费者从输出主题接收处理后的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
public class MessageController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody String message) {
kafkaTemplate.send("my-input-topic", message);
return ResponseEntity.ok("Message sent successfully");
}

@GetMapping("/receive")
public ResponseEntity<List<String>> receiveMessages() {
List<String> messages = // 从输出主题读取消息
return ResponseEntity.ok(messages);
}
}

使用

计算音乐热点值
1.导出依赖

2.编写输出类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Data
@Api(value = "消息发送增量")
public class UpdateArticleMess {
private String musicid;
/**
* 修改数据的增量,可为正负
*/
private Integer add;

private UpdateArticleType type;

public enum UpdateArticleType {
COMMENT, // 评论
LIKE, // 点赞
VIEWS; // 浏览量(假设您想保留原有的VIEWS,表示浏览次数)
}

}

3.编写发送消息
1
2
3
4
5
6
7
8
9
10
11
12
public class HotArticleConstants {

// 消费者队列
public static final String HOT_ARTICLE_CONSUMER_QUEUE = "hot_article_consumer_queue";

// 生产者队列
public static final String HOT_ARTICLE_PRODUCER_QUEUE = "hot_article_producer_queue";

// 数据聚合处理队列
public static final String HOT_ARTICLE_AGGREGATION_QUEUE = "hot_article_aggregation_queue";
}

发送消息到聚合处理

1
2
3
4
5
6
7
8
String mucisId = String.valueOf(redisTemplate.opsForHash().increment("onlineMusic:" + MusicId, OnlineMusicField.PLAY_COUNT.getFieldCode(), 1));
UpdateArticleMess mess=new UpdateArticleMess();
mess.setMusicid(MusicId);
mess.setAdd(1);
mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);


kmsTemplate.send( HotArticleConstants.HOT_ARTICLE_AGGREGATION_QUEUE,mess);

4..定义消息封装类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Data
public class ArticleVisitStreamMess {
/**
* 文章id
*/
private String muicid ;
/**
* 阅读
*/
private int view;

/**
* 评论
*/
private int comment;
/**
* 点赞
*/
private int like;
}

5.编写数据聚合处理 **

ps:感谢ai大老人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@Configuration
@Slf4j
public class HotArticleStreamHandler {
///从某个 Kafka 主题中读取数据并进行处理
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder) {
//读取数据聚合队列消息-数据消息miss类型
KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_AGGREGATION_QUEUE);
//聚合流式处理
stream.map((key,value)->
{
UpdateArticleMess mess= JSON.parseObject(value,UpdateArticleMess.class);
//重置消息的key:1234343434(musicid) 和 value: likes:1(typeAndAdd)
String musicid = mess.getMusicid();
String typeAndAdd =mess.getType().name()+mess.getAdd();
return new KeyValue<>(musicid, typeAndAdd);
}///第一层数据转换stream
).groupBy((key,value)->key)//根据id分组
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//时间窗口堆积处理
.aggregate(new Initializer<String>()
{/// // 执行聚合操作,初始化为 "0"
@Override
public String apply() {
return "view:0,comment:0,like:0";
}
},new Aggregator<String,String,String>()
{
/**
* 真正的聚合操作,返回值是消息的value
*/
@Override
public String apply(String key, String value, String aggValue) {
//无值直接返回
if(StringUtils.isBlank(value))
{
return aggValue;
}
///"view:0,comment:0,like:0"
/// likes:1(typeAndAdd)
String[] aggAry = aggValue.split(",");
int com=0,lik=0,vie=0;//评论 点赞 浏览
for (String item : aggAry) {
//拿出对应的点赞 浏览 评论次数
if(item.startsWith("comment"))
{
com=Integer.parseInt(item.split(":")[1]);
}else if(item.startsWith("like"))
{
lik=Integer.parseInt(item.split(":")[1]);
}else if(item.startsWith("view"))
{
vie=Integer.parseInt(item.split(":")[1]);
}

}
///累加操作
String[] valAry = value.split(":");
if(valAry[0].equals("view"))
{
vie+=Integer.parseInt(valAry[1]);
}else if(valAry[0].equals("comment"))
{
com+=Integer.parseInt(valAry[1]);
}else if(valAry[0].equals("like"))
{
lik+=Integer.parseInt(valAry[1]);
}
// 进行统计
String formatStr = String.format(",COMMENT:%d,LIKES:%d,VIEWS:%d", com, lik, vie);
System.out.println("文章的id:"+key);
System.out.println("当前时间窗口内的消息处理结果:"+formatStr);

return formatStr;
}
}, Materialized.as("hot-article-stream-aggregation-store"))// 指定状态存储
///Kafka Streams 会使用这个名称为你的聚合操作创建一个状态存储,并将结果存储在本地(或远程)供后续查询和使用。
//使用 Materialized.as() 可以创建命名的存储。如果你想在流处理应用程序中查询聚合结果,可以通过状态存储的名称找到对应的结果。
.toStream().map((key,value)->{

return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
}).to(HotArticleConstants.HOT_ARTICLE_PRODUCER_QUEUE);// // 生产者队列

return stream;

}

/**
* 将传入的文章统计数据(作为字符串形式)解析为一个 ArticleVisitStreamMess 对象
* @param
* @param value
* @return
*/
private Object formatObj(String mucid, String value) {
ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
mess.setMuicid(mucid);
//COMMENT:0,LIKES:0,VIEWS:0
String[] valAry = value.split(",");
for (String val : valAry) {
String[] split = val.split(":");
/*
COMMENT, // 评论
LIKE, // 点赞
VIEWS; // 浏览量(假设您想保留原有的VIEWS,表示浏览次数)
*/
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COMMENT:
mess.setComment(Integer.parseInt(split[1]));
break;
case LIKE:
mess.setLike(Integer.parseInt(split[1]));
break;
case VIEWS:
mess.setView(Integer.parseInt(split[1]));
break;
}
}
log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
return JSON.toJSONString(mess);
}
}

分层聚合图片

该代码的核心是流处理的三个阶段:

  1. 读取来自 Kafka 队列的数据流。
  2. 对数据进行映射、分组和聚合处理。
  3. 将聚合后的结果输出到另一个 Kafka 队列。

image-20241012112753671

6.定义监听者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
@Slf4j
public class ArticleIncrHandleListener {

@Autowired
RedisTemplate redisTemplate;
@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_CONSUMER_QUEUE)
public void handle(String message) {
ArticleVisitStreamMess mess= JSON.parseObject(message, ArticleVisitStreamMess.class);
//对分值进行处理
savemess(mess);

log.info("收到消息:{}", message);
}

private void savemess(ArticleVisitStreamMess mess) {
//拿出点赞 评论 浏览-id的权重
//计算总评分-存入redis-id+总积分-叠加//总积分表 -mucisid id->根据音乐id总权拿id
//根据数量-评论 浏览 id 修改es对应的最新数量
//目前能像到的就这样
}
}

ps:集体逻辑完了


kafka Stream
http://example.com/2024/10/11/Middleware/kafka/kafkaStream/Kafka Stream/
作者
John Doe
发布于
2024年10月11日
许可协议