Skip to content

Kafka

Kafka 是由 LinkedIn 开发,并于 2011 年作为 Apache 项目的一部分开源发布的一个分布式流处理平台。 它最初设计用于日志聚合系统,但如今已经成为一个广泛使用的分布式消息队列系统,能够处理实时数据流,提供高吞吐量、低延迟的消息发布与订阅功能。Kafka 由 Apache Software Foundation 维护和进一步开发。

Install

Install via docker

shell
docker pull apache/kafka:4.0.0
docker run -p 9092:9092 apache/kafka:4.0.0

自定义数据目录、网络监听地址,如在 Linux docker 环境上创建容器实例并共享宿主机网络使用监听地址 1.2.3.4 :

shell
export PATH_KAFKA_LOG=/v/data/kafka/kafka-logs
mkdir -p $PATH_KAFKA_LOG

chmod -R 777 $PATH_KAFKA_LOG

export IP_ADDR=1.2.3.4

docker run \
-d \
--restart unless-stopped \
--name kafka \
--network=host \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_NODE_ID=1 \
-e KAFKA_LISTENERS=PLAINTEXT://$IP_ADDR:9092,CONTROLLER://$IP_ADDR:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$IP_ADDR:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@$IP_ADDR:9093 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_LOG_DIRS=/tmp/kafka-logs \
-v $PATH_KAFKA_LOG:/tmp/kafka-logs \
apache/kafka:4.0.0

创建必要 topic __consumer_offsets

shell
docker exec -it kafka \
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server 1.2.3.4:9092 \
--topic __consumer_offsets \
--partitions 50 \
--replication-factor 1 \
--config cleanup.policy=compact

Quickstart

start a Kafka instance (the KRaft mode is default since 4.0.0)

shell
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

bin/kafka-server-start.sh config/server.properties

create a topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic applog

produce messages
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic applog

consume messages
bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic applog

ref: https://kafka.apache.org/quickstart

常用命令

Broker

list all brokers
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

Topic

create a topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic applog

list topics
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

inspect a topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic applog

delete a topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic applog

Consumer group

list consumers
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

consume a topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic applog --group applog

inspect a consumer group
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group applog

delete a consumer group
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group applog

Configuration

increase partition number
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic applog --partitions 3

Configure time-based data retention, change it from 7 days to 30 days
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name applog --add-config retention.ms=$((30 * 24 * 60 * 60 * 1000))

Configure size-based data retention, change it from -1(no limit) to 100 GB
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name applog --add-config retention.bytes=$((100 * 1024 * 1024 * 1024))

Verifying the Retention Configuration
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name applog

Manage Clusters

refs

Start first node

Start the instance ka01

shell
export PATH_DATA_PREFIX=$HOME/v/data

mkdir -p $PATH_DATA_PREFIX/ka0{1,2,3,4}

cp -a config config01

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config01/server.properties

bin/kafka-server-start.sh config01/server.properties

Create a topic then produce and consume messages

  • bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic applog
  • bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic applog
  • bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic applog

Custom file config01/server.properties

properties
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=localhost:9093

# The address the socket server listens on.
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker or the controller will advertise to clients.
advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093

# A comma-separated list of the names of the listeners used by the controller.
controller.listener.names=CONTROLLER

# ...

# A comma separated list of directories under which to store log files
log.dirs=/path/to/PATH_DATA_PREFIX/kaf01/kraft-combined-logs

# ...

Scale up form one node to two nodes cluster

Stop the kaf01 , update config01/server.properties , add or update
controller.quorum.voters=...
to
controller.quorum.voters=1@localhost:9093,2@localhost:29093

, then restart the kaf01.

Create the kaf02 instance configuration base on kaf01's:

shell
cp -a config01 config02

Custom file config02/server.properties

properties
process.roles=broker,controller

node.id=2

controller.quorum.bootstrap.servers=localhost:29093

controller.quorum.voters=1@localhost:9093,2@localhost:29093

listeners=PLAINTEXT://:29092,CONTROLLER://:29093

inter.broker.listener.name=PLAINTEXT

advertised.listeners=PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093

controller.listener.names=CONTROLLER

log.dirs=/path/to/PATH_DATA_PREFIX/kaf02/kraft-combined-logs

Start the instance ka02

shell
export KAFKA_CLUSTER_ID=`grep 'cluster.id' $PATH_DATA_PREFIX/kaf01/kraft-combined-logs/meta.properties | awk -F'=' '{print $2}'`

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config02/server.properties

bin/kafka-server-start.sh config02/server.properties

Describe runtime status

bin/kafka-metadata-quorum.sh --bootstrap-server localhost:29092 describe --status

Debug log segments

shell
find $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-*/*.log

bin/kafka-dump-log.sh --cluster-metadata-decoder --files $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log

Inspect the state of the cluster metadata partition(stop the instance before issue this)

shell
find $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-*/*.snapshot

bin/kafka-metadata-shell.sh --snapshot $PATH_DATA_PREFIX/kaf02/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log

CRUD a topic on ka01,ka02

shell
bin/kafka-topics.sh --delete --topic test02 --bootstrap-server localhost:29092

bin/kafka-topics.sh --create --topic test02 --bootstrap-server localhost:29092 --partitions 2 --replication-factor 2

bin/kafka-topics.sh --describe --topic test02 --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic test02 --bootstrap-server localhost:29092

Test produce messages on ka01

bin/kafka-console-producer.sh --topic test02 --bootstrap-server localhost:9092

Test consume messages on ka02

bin/kafka-console-consumer.sh --topic test02 --from-beginning --bootstrap-server localhost:29092

Scale up to 2n+1 nodes cluster

Create the kaf03 instance base on kaf02's:

  • create the kaf03 configuration from kaf02 cp -a config02 config03
  • update config03/server.properties
  • format the kaf03 data folder
  • start the kaf03 instance

Key points

  • controller.quorum.voters 如果不是 2n+1 时,producer 写入 topic 会在其中一个 节点 commit 后不会丢,但是 consumer 会被堵塞,数据同步没有确认前不允许消费?
  • 扩容时修改 controller.quorum.voters 需要逐个修改已有节点的配置该值,并挨个重启?

Rebalance by manual

Generate the file topics.json

json
{
  "topics": [{ "topic": "topic1" }, { "topic": "topic2" }, { "topic": "topic3" }]
}

Create the file reassignment.json from output bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --broker-list "1,2,3" --topics-to-move-json-file topics.json --generate

Execute the Reassignment: bin/kafka-reassign-partitions.sh --bootstrap-server your_broker:9092 --reassignment-json-file reassignment.json --execute

UI for Kafka

NOTICE: The network mode --network=host does not work on macOS, you have to start kafka and kafka-ui container in the same network:

Create a network share in containers:
docker network create kaf_cluster

Start a kafka instance

shell
export PATH_KAFKA_LOG=$HOME/v/data/kafka/kafka-logs
mkdir -p $PATH_KAFKA_LOG

docker run \
-d \
--restart unless-stopped \
--name kaf00 \
--network kaf_cluster \
-p 9092:9092 \
-p 9093:9093 \
-p 19092:19092 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_NODE_ID=1 \
-e KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS=kaf00:9093 \
-e KAFKA_LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093,PLAINTEXT_HOST://:19092" \
-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kaf00:9092,CONTROLLER://kaf00:9093,PLAINTEXT_HOST://localhost:19092" \
-e KAFKA_CONTROLLER_LISTENER_NAMES='CONTROLLER' \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@kaf00:9093 \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
-e KAFKA_LOG_DIRS=/tmp/kafka-logs \
-v $PATH_KAFKA_LOG:/tmp/kafka-logs \
apache/kafka:4.0.0

Start a redpanda console instance

shell
docker run \
-d \
--restart unless-stopped \
--network=kaf_cluster \
--name kaf_console \
-p 8080:8080 \
-e KAFKA_BROKERS=kaf00:9092 \
docker.redpanda.com/redpandadata/console:latest

You can access kafka via host IP or docker internal IP now.

On host via host IP: kcat -b <yourHostIp>:19092 -L

On host via docker internal IP:

shell
docker exec -it  kaf00 /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
docker exec -it  kaf00 /opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server kaf00:9092

see also

kcat CLI for Kafka

list topics and metadata kcat -b localhost:9092 -L

list topics and metadata in JSON kcat -b localhost:9092 -L -J

consume a topic kcat -b localhost:9092 -t my-topic -C

produce a message echo '{"name":"foo","count":123}' | kcat -P -b localhost:9092 -t my-topic

Kafka Go

Released under the CC-BY-NC-4.0