- 深入理解Kafka:核心设计与实践原理
- 朱忠华
- 1077字
- 2020-08-27 23:31:17
1.3 生产与消费
由 1.1 节的内容可知,生产者将消息发送至 Kafka 的主题中,或者更加确切地说应该是主题的分区中,而消费者也是通过订阅主题从而消费消息的。在演示生产与消费消息之前,需要创建一个主题作为消息的载体。
Kafka提供了许多实用的脚本工具,存放在$KAFKA_HOME的bin目录下,其中与主题有关的就是 kafka-topics.sh 脚本,下面我们用它演示创建一个分区数为 4、副本因子为 3 的主题topic-demo,示例如下:
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/25_3.jpg?sign=1739561450-n71piwzCh007lQowNqNEpmwAzOrv227A-0-2e1687ad743b4bde12372ec3589c6de4)
其中--zookeeper指定了Kafka所连接的ZooKeeper服务地址,--topic指定了所要创建主题的名称,--replication-factor 指定了副本因子,--partitions 指定了分区个数,--create是创建主题的动作指令。
还可以通过--describe展示主题的更多具体信息,示例如下:
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/25_4.jpg?sign=1739561450-Fy0K0Ko3UdciYoHMI7UFaCjCSNZTAdEa-0-1dc91e5ed8c167338548d2befd74ae44)
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/26_1.jpg?sign=1739561450-B32TZPFev8oZWJo3xzK3cOBLFXOxRYfk-0-a0efe4c12707d794cd622427d74c3db9)
创建主题topic-demo之后我们再来检测一下Kafka集群是否可以正常地发送和消费消息。$KAFKA_HOME/bin 目录下还提供了两个脚本 kafka-console-producer.sh 和 kafka-console-consumer.sh,通过控制台收发消息。首先我们打开一个shell终端,通过kafka-console-consumer.sh脚本来订阅主题topic-demo,示例如下:
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/26_2.jpg?sign=1739561450-b7Etyy6UroiJmO8ASci7zTy3T5x8GQIZ-0-db68eef42f1a9073860cb36a26b9563d)
其中--bootstrap-server指定了连接的Kafka集群地址,--topic指定了消费者订阅的主题。目前主题topic-demo尚未有任何消息存入,所以此脚本还不能消费任何消息。
我们再打开一个shell终端,然后使用kafka-console-producer.sh脚本发送一条消息“Hello,Kafka!”至主题topic-demo,示例如下:
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/26_3.jpg?sign=1739561450-Gs0rYwsRHjFiWXhw7AUgZ7Yqx1BUL42o-0-b9f6b445e10b34868632cd3b40b091e0)
其中--broker-list指定了连接的Kafka集群地址,--topic指定了发送消息时的主题。示例中的第二行是通过人工键入的方式输入的,按下回车键后会跳到第三行,即“>”字符处。此时原先执行 kafka-console-consumer.sh脚本的 shell终端中出现了刚刚输入的消息“Hello,Kafka!”,示例如下:
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/26_4.jpg?sign=1739561450-hqiRXq5qJJeUqBJ0618OHEsUc5KoV7ae-0-b910a91efe44cacc5419982870180b08)
读者也可以通过输入一些其他自定义的消息来熟悉消息的收发及这两个脚本的用法。不过这两个脚本一般用来做一些测试类的工作,在实际应用中,不会只是简单地使用这两个脚本来做复杂的与业务逻辑相关的消息生产与消费的工作,具体的工作还需要通过编程的手段来实施。下面就以Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下:
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/27_1.jpg?sign=1739561450-rmoEGw23y4uJwKjvnYZy2B5pCh6aKBQ3-0-8a78f89e3caa8b2911014f6c950e9014)
要往Kafka中写入消息,首先要创建一个生产者客户端实例并设置一些配置参数,然后构建消息的ProducerRecord对象,其中必须包含所要发往的主题及消息的消息体,进而再通过生产者客户端实例将消息发出,最后可以通过 close()方法来关闭生产者客户端实例并回收相应的资源。具体的示例如代码清单1-1所示,与脚本演示时一样,示例中仅发送一条内容为“Hello,Kafka!”的消息到主题topic-demo。
代码清单1-1 生产者客户端示例代码
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/27_2.jpg?sign=1739561450-MMu6209AzPK7OBpfXfooGbGUTOO5KaIi-0-a7fd6cdc9cd1a3a3262f3ccbee947f74)
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/28_1.jpg?sign=1739561450-nriDI6n426O5TdOSd3Z5yoxetRrjik9n-0-0e2de60f40bfb0fd216f9d05a82a70c4)
对应的消费消息也比较简单,首先创建一个消费者客户端实例并配置相应的参数,然后订阅主题并消费即可,具体的示例代码如代码清单1-2所示。
代码清单1-2 消费者客户端示例代码
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/28_2.jpg?sign=1739561450-kQ7LmuWCSTM5olQZ6pUZBGRJ0CdCZVkF-0-6b35c680dd5767a36dbada6b0e7599f6)
![](https://epubservercos.yuewen.com/C6A8CD/13916125605940406/epubprivate/OEBPS/Images/29_1.jpg?sign=1739561450-wvJxotcXXXCxJzcEsU1xFojS1T1Nh11l-0-b180492fd5f9190eae61d8d46fa3baac)
通过这些示例,相信各位读者对Kafka应该有了初步的认识。这仅仅是一个开始,要正确、灵活地运用好Kafka还需要对它进行深入探索,包括生产者和消费者客户端的使用细节及原理、服务端的使用细节及原理、运维、监控等,每一个方面都等着读者去一一攻破。