kafka-console-producer.sh # produce messages
--bootstrap-server localhost:9092
--topic mytopic # to which topic?
OPTIONAL produce with keys (key:value)
--property parse.key=true
--property key.separator=:
Messages, also called records, are the payload. They are sent as byte arrays and, under the hood, are typically grouped into batches before being sent
Producers send messages to the leader of a partition and select the partition themselves with the help of a partitioner.
Topics are used to bundle messages of a business topic. They are comparable to tables in a database.
Partitions are the backbone of Kafka’s performance.Topics are divided into partitions to parallelize and scale processes. To ensure fault . tolerance and high availability, partitions are replicated across brokers
Consumers receive and process messages from Kafka and can read from multiple partitions, as well as from multiple topics
Consumer Groups allow parallel processing and scalable message consumption by dividing partitions and messages among consumers. If one consumer . fails, the others in the group take over its tasks, ensuring fault tolerance.
Brokers are Kafka servers. They share replicas and tasks evenly among themselves, which improves performance. If one broker fails, another takes . over, increasing reliability
Leader is the broker that is responsible for read and write operations of a partition. Leaders are distributed as evenly as possible among all . brokers.
Followers are the brokers to which the partitions are copied from the leader to increase resilience.
Coordination Cluster (KRaft, previously ZooKeeper ensemble) is used by Kafka to coordinate itself
Messages, also called records, are the payload. They are sent as byte arrays and, under the hood, are typically grouped into batches before being sent
Producers send messages to the leader of a partition and select the partition themselves with the help of a partitioner.
Topics are used to bundle messages of a business topic. They are comparable to tables in a database.
Partitions are the backbone of Kafka’s performance.Topics are divided into partitions to parallelize and scale processes. To ensure fault . tolerance and high availability, partitions are replicated across brokers
Consumers receive and process messages from Kafka and can read from multiple partitions, as well as from multiple topics
Consumer Groups allow parallel processing and scalable message consumption by dividing partitions and messages among consumers. If one consumer . fails, the others in the group take over its tasks, ensuring fault tolerance.
Brokers are Kafka servers. They share replicas and tasks evenly among themselves, which improves performance. If one broker fails, another takes . over, increasing reliability
Leader is the broker that is responsible for read and write operations of a partition. Leaders are distributed as evenly as possible among all . brokers.
Followers are the brokers to which the partitions are copied from the leader to increase resilience.
Coordination Cluster (KRaft, previously ZooKeeper ensemble) is used by Kafka to coordinate itself
kafka-console-producer.sh # produce messages
--bootstrap-server localhost:9092
--topic mytopic # to which topic?
OPTIONAL produce with keys (key:value)
--property parse.key=true
--property key.separator=:
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders
Hello World
Order #12345
Customer request
# Press Ctrl+D to stop the command
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic user-events \
--property parse.key=true \
--property key.separator=:
user123:{"action": "login", "timestamp": "2024-01-15T10:30:00Z"}
user456:{"action": "purchase", "amount": 99.99}
user789:{"action": "logout", "session_duration": 1800}
# Press Ctrl+D to stop the command
# For reliability
acks=all
enable.idempotence=true
# For throughput
batch.size=524288 # e.g. 512KiB, Default: 16384 (16KiB), Higher values increase throughput
linger.ms=5 # Default: 0, Waits for more messages to fill a batch
compression.type=lz4 # Default: none, Compresses messages to increase throughput
# For monitoring
client.id=my-producer-app
kafka-console-consumer.sh # consume messages
--bootstrap-server localhost:9092
--topic mytopic # From which topic?
OPTIONAL
--from-beginning # read from the beginning
--property print.key=true # display keys
--group my_group # assign to Consumer Group
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders
# Press Ctrl+C to stop the command
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
# Press Ctrl+C to stop the command
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic user-events \
--property print.key=true \
--from-beginning
# Press Ctrl+C to stop the command
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic user-events \
--group mygroup \
--from-beginning
# Press Ctrl+C to stop the command
kafka-topics.sh # topic operations
--bootstrap-server localhost:9092
--list # list all topics
--topic mytopic
--create # create topic
--replication-factor 3 # How many replicas?
OPTIONAL
--config min.insync.replicas=2 # other settings
--describe # describe topic
--delete # delete it
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic new-topic \
--partitions 6 \
--replication-factor 1
# For reliability
replication-factor=3
min.insync.replicas=2
# For throughput
partitions = 12 # Default: 1
# For cleanup
cleanup.policy=delete
retention.hours=168 # Default: Delete messages after 7 days
# For compacted topics
cleanup.policy=compact
# For permanent retention
cleanup.policy=delete
retention.hours=-1
Read more:
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic new-topic
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
kafka-topics.sh \
--bootstrap-server localhost:9092 \
--delete \
--topic new-topic
kafka-consumer-groups.sh # manage consumer groups
--bootstrap-server localhost:9092
--list # show all consumer groups
--describe # more info about the groups
--reset-offsets # reset offsets:
--execute # otherwise: dry run
--to-datetime # Reset to a time
--to-earliest # To the oldest message
--to-latest # To the latest message
# read more: see help page
--delete-offsets # delete only offsets?
--delete # or the whole group?
# what does the command refer to? (combinable)
--group mygroup # to a group?
--all-groups # to all groups?
--topic mytopic # to a topic?
--all-topics # to all topics?
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group mygroup
kafka-producer-perf-test.sh # Producer Performance Tests
--topic mytopic
--num-records 1000 # number of messages to produce
--throughput -1 # disable or MSG/s
--record-size 1000 # size in bytes
ALTERNATIVE
--payload-file myfile.json # one message per line
--producer-props bootstrap.servers=... # settings
OPTIONAL
--producer.config producer.properties # read from file
kafka-producer-perf-test.sh \
--topic performance-test \
--num-records 10000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
kafka-producer-perf-test.sh \
--topic throughput-test \
--num-records 1000000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092 \
batch.size=943718 \
linger.ms=10 \
compression.type=lz4
kafka-consumer-perf-test.sh # consumer performance tests
--bootstrap-server localhost:9092 \
--topic mytopic
--consumer.config consumer.properties # settings
--messages 1000000 # how many messages to read?
kafka-consumer-perf-test.sh \
--bootstrap-server localhost:9092 \
--topic performance-test \
--messages 100000 \
--group performance-test
kafka-streams-application-reset.sh # reset Kafka Streams applications
kafka-leader-election.sh # elect new partition leader
kafka-reassign-partitions.sh # move partitions between brokers
kafka-configs.sh # manage settings
kafka-dump-log.sh # view segments
kafka-acls.sh # manage permissions
kcat # Alternative to kafka-console-producer and consumer
-b localhost:9092
-t mytopic
-C/-P # consumer or producer
kafka-streams-application-reset.sh \
--bootstrap-servers localhost:9092 \
--application-id my-streams-app \
--input-topics input-topic \
--to-datetime 2024-01-01T00:00:00Z
kafka-leader-election.sh \
--bootstrap-server localhost:9092 \
--election-type PREFERRED \
--all-topic-partitions
# Produce messages
kcat \
-b localhost:9092 \
-t mytopic \
-P
Hello World
Another message
# Press Ctrl+D to stop
# Consume messages
kcat \
-b localhost:9092 \
-t mytopic \
-C