Kafka
Kafka 是由 LinkedIn 开发,并于 2011 年作为 Apache 项目的一部分开源发布的一个分布式流处理平台。 它最初设计用于日志聚合系统,但如今已经成为一个广泛使用的分布式消息队列系统,能够处理实时数据流,提供高吞吐量、低延迟的消息发布与订阅功能。Kafka 由 Apache Software Foundation 维护和进一步开发。
Install
Install via docker
docker pull apache/kafka:4.0.0
docker run -p 9092:9092 apache/kafka:4.0.0自定义数据目录、网络监听地址,如在 Linux docker 环境上创建容器实例并共享宿主机网络使用监听地址 1.2.3.4 :
export PATH_KAFKA_LOG=/v/data/kafka/kafka-logs
mkdir -p $PATH_KAFKA_LOG
chmod -R 777 $PATH_KAFKA_LOG
export IP_ADDR=1.2.3.4
docker run \
-d \
--restart unless-stopped \
--name kafka \
--network=host \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_NODE_ID=1 \
-e KAFKA_LISTENERS=PLAINTEXT://$IP_ADDR:9092,CONTROLLER://$IP_ADDR:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$IP_ADDR:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@$IP_ADDR:9093 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_LOG_DIRS=/tmp/kafka-logs \
-v $PATH_KAFKA_LOG:/tmp/kafka-logs \
apache/kafka:4.0.0创建必要 topic __consumer_offsets
docker exec -it kafka \
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server 1.2.3.4:9092 \
--topic __consumer_offsets \
--partitions 50 \
--replication-factor 1 \
--config cleanup.policy=compactQuickstart
start a Kafka instance (the KRaft mode is default since 4.0.0)
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
bin/kafka-server-start.sh config/server.propertiescreate a topicbin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic applog
produce messagesbin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic applog
consume messagesbin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic applog
ref: https://kafka.apache.org/quickstart
常用命令
Broker
list all brokersbin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Topic
create a topicbin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic applog
list topicsbin/kafka-topics.sh --list --bootstrap-server localhost:9092
inspect a topicbin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic applog
delete a topicbin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic applog
Consumer group
list consumersbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
consume a topicbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic applog --group applog
inspect a consumer groupbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group applog
delete a consumer groupbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group applog
Configuration
increase partition numberbin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic applog --partitions 3
Configure time-based data retention, change it from 7 days to 30 daysbin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name applog --add-config retention.ms=$((30 * 24 * 60 * 60 * 1000))
Configure size-based data retention, change it from -1(no limit) to 100 GBbin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name applog --add-config retention.bytes=$((100 * 1024 * 1024 * 1024))
Verifying the Retention Configurationbin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name applog
Manage Clusters
refs
- https://kafka.apache.org/documentation/#kraft
- https://docs.confluent.io/platform/current/clusters/overview.html
Start first node
Start the instance ka01
export PATH_DATA_PREFIX=$HOME/v/data
mkdir -p $PATH_DATA_PREFIX/ka0{1,2,3,4}
cp -a config config01
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config01/server.properties
bin/kafka-server-start.sh config01/server.propertiesCreate a topic then produce and consume messages
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic applogbin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic applogbin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic applog
Custom file config01/server.properties
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=localhost:9093
# The address the socket server listens on.
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker or the controller will advertise to clients.
advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
# A comma-separated list of the names of the listeners used by the controller.
controller.listener.names=CONTROLLER
# ...
# A comma separated list of directories under which to store log files
log.dirs=/path/to/PATH_DATA_PREFIX/kaf01/kraft-combined-logs
# ...Scale up form one node to two nodes cluster
Stop the kaf01 , update config01/server.properties , add or updatecontroller.quorum.voters=...
tocontroller.quorum.voters=1@localhost:9093,2@localhost:29093
, then restart the kaf01.
Create the kaf02 instance configuration base on kaf01's:
cp -a config01 config02Custom file config02/server.properties
process.roles=broker,controller
node.id=2
controller.quorum.bootstrap.servers=localhost:29093
controller.quorum.voters=1@localhost:9093,2@localhost:29093
listeners=PLAINTEXT://:29092,CONTROLLER://:29093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093
controller.listener.names=CONTROLLER
log.dirs=/path/to/PATH_DATA_PREFIX/kaf02/kraft-combined-logsStart the instance ka02
export KAFKA_CLUSTER_ID=`grep 'cluster.id' $PATH_DATA_PREFIX/kaf01/kraft-combined-logs/meta.properties | awk -F'=' '{print $2}'`
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config02/server.properties
bin/kafka-server-start.sh config02/server.propertiesDescribe runtime status
bin/kafka-metadata-quorum.sh --bootstrap-server localhost:29092 describe --status
Debug log segments
find $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-*/*.log
bin/kafka-dump-log.sh --cluster-metadata-decoder --files $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.logInspect the state of the cluster metadata partition(stop the instance before issue this)
find $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-*/*.snapshot
bin/kafka-metadata-shell.sh --snapshot $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.logCRUD a topic on ka01,ka02
bin/kafka-topics.sh --delete --topic test02 --bootstrap-server localhost:29092
bin/kafka-topics.sh --create --topic test02 --bootstrap-server localhost:29092 --partitions 2 --replication-factor 2
bin/kafka-topics.sh --describe --topic test02 --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic test02 --bootstrap-server localhost:29092Test produce messages on ka01
bin/kafka-console-producer.sh --topic test02 --bootstrap-server localhost:9092
Test consume messages on ka02
bin/kafka-console-consumer.sh --topic test02 --from-beginning --bootstrap-server localhost:29092
Scale up to 2n+1 nodes cluster
Create the kaf03 instance base on kaf02's:
- create the kaf03 configuration from kaf02
cp -a config02 config03 - update config03/server.properties
- format the kaf03 data folder
- start the kaf03 instance
Key points
controller.quorum.voters如果不是 2n+1 时,producer 写入 topic 会在其中一个 节点 commit 后不会丢,但是 consumer 会被堵塞,数据同步没有确认前不允许消费?- 扩容时修改
controller.quorum.voters需要逐个修改已有节点的配置该值,并挨个重启?
Rebalance by manual
Generate the file topics.json
{
"topics": [{ "topic": "topic1" }, { "topic": "topic2" }, { "topic": "topic3" }]
}Create the file reassignment.json from output bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --broker-list "1,2,3" --topics-to-move-json-file topics.json --generate
Execute the Reassignment: bin/kafka-reassign-partitions.sh --bootstrap-server your_broker:9092 --reassignment-json-file reassignment.json --execute
UI for Kafka
NOTICE: The network mode --network=host does not work on macOS, you have to start kafka and kafka-ui container in the same network:
Create a network share in containers:docker network create kaf_cluster
Start a kafka instance
export PATH_KAFKA_LOG=$HOME/v/data/kafka/kafka-logs
mkdir -p $PATH_KAFKA_LOG
docker run \
-d \
--restart unless-stopped \
--name kaf00 \
--network kaf_cluster \
-p 9092:9092 \
-p 9093:9093 \
-p 19092:19092 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS=kaf00:9093 \
-e KAFKA_LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093,PLAINTEXT_HOST://:19092" \
-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kaf00:9092,CONTROLLER://kaf00:9093,PLAINTEXT_HOST://localhost:19092" \
-e KAFKA_CONTROLLER_LISTENER_NAMES='CONTROLLER' \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@kaf00:9093 \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
-e KAFKA_LOG_DIRS=/tmp/kafka-logs \
-v $PATH_KAFKA_LOG:/tmp/kafka-logs \
apache/kafka:4.0.0Start a redpanda console instance
docker run \
-d \
--restart unless-stopped \
--network=kaf_cluster \
--name kaf_console \
-p 8080:8080 \
-e KAFKA_BROKERS=kaf00:9092 \
docker.redpanda.com/redpandadata/console:latestYou can access kafka via host IP or docker internal IP now.
On host via host IP: kcat -b <yourHostIp>:19092 -L
On host via docker internal IP:
docker exec -it kaf00 /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
docker exec -it kaf00 /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server kaf00:9092see also
kcat CLI for Kafka
list topics and metadata kcat -b localhost:9092 -L
list topics and metadata in JSON kcat -b localhost:9092 -L -J
consume a topic kcat -b localhost:9092 -t my-topic -C
produce a message echo '{"name":"foo","count":123}' | kcat -P -b localhost:9092 -t my-topic
Kafka Go
- confluentinc/confluent-kafka-go, Confluent's Apache Kafka Golang client, build on The Apache Kafka C/C++ library librdkafka
- segmentio/kafka-go Kafka library in Go
- lovoo/goka Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
- IBM/sarama Sarama is a Go library for Apache Kafka.
