Skip to main content

Kafka

如何使用Docker Compose部署Kafka

info

借鉴github仓库
下面示例使用最简单的 zk-single-kafka-single.yml

以下为正确示例:

对仓库中的文件作了两处调整

  • 调整了连接地址
-  KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-192.168.3.163}:9092,DOCKER://host.docker.internal:29092
  • 在Zoo1和kafka1中都添加了时区,可以解决时间不对的问题
+      TZ: Asia/Shanghai
点击展开查看完整配置文件
version: '2.1'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
TZ: Asia/Shanghai

kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-192.168.3.163}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
TZ: Asia/Shanghai
depends_on:
- zoo1

报错 could not be established. Broker may not be available

调整外部连接地址即可解决问题

[| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] 
Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.

如何使用命令行操作集群

# 创建topic
docker run --rm --network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-topics --bootstrap-server=127.0.0.1:9092 \
--create \
--topic=my-topic \
--partitions=3 \
--replication-factor=1
# 生产消息
docker run --tty --interactive \
--rm \
--network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-console-producer --broker-list=127.0.0.1:9092 --topic=my-topic

# 消费消息
docker run --tty --interactive \
--rm \
--network=host \
confluentinc/cp-kafka:7.3.2 \
kafka-console-consumer --bootstrap-server=127.0.0.1:9092 --topic=my-topic

如何在Spring Boot中使用Kafka

参考资料

参考仓库代码

仓库代码简单介绍了 Kafka创建字符串消息和Pojo消息,发送消息的示例。

参考资料涉及多种类型消息混合生产和消费。值得学习和借鉴。

生产者使用注意点

  1. 一个JVM下使用一个生产者即可,多个生产者可能OOM,一个生产者buffer.memory=32MB
  2. Batch设计,首先看batch.size 是否满足,不满足再看linger.ms ,当缓存消息到达buffer.memory 会直接触发消息发送到服务器。
  3. acks, 0 -不确认,1-所有主节点写入成功,all-所有节点写入成功

触发Coordinator Rebalance

  • 组成员数量变化
  • 订阅主题数量变化
  • 订阅主题分区数变化

建议

  1. session.timeout.ms > 3 * hearbeat.interval.ms
  2. 控制消费者消费消息的时间,不要超过max.poll.interval.ms配置时间
Agreement
The code part of this work is licensed under Apache License 2.0 . You may freely modify and redistribute the code, and use it for commercial purposes, provided that you comply with the license. However, you are required to:
  • Attribution: Retain the original author's signature and code source information in the original and derivative code.
  • Preserve License: Retain the Apache 2.0 license file in the original and derivative code.
The documentation part of this work is licensed under Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License . You may freely share, including copying and distributing this work in any medium or format, and freely adapt, remix, transform, and build upon the material. However, you are required to:
  • Attribution: Give appropriate credit, provide a link to the license, and indicate if changes were made.
  • NonCommercial: You may not use the material for commercial purposes. For commercial use, please contact the author.
  • ShareAlike: If you remix, transform, or build upon the material, you must distribute your contributions under the same license as the original.