Improve Kafka throughput
Table of Contents
Kafka is a high-performance and scalable messaging system. Sometimes when handling big data. The default configuration may limit the maximum performance. In this article, I’ll explain how messages are generate and saved in Kafka, and how to improve performance by changing configuration.
Kafka Internals #
How does Producer Send Messages? #
In short, messages will assembled into batches (named RecordBatch
) and send to broker.
The producer manages some internal queues, and each queue contains RecordBatch
that will send to one broker. When calling send
method, the producer will look into the internal queue and try to append this message to RecordBatch
which is smaller than batch.size
(default value is 16KB) or create new RecordBatch
.
There is also a sender thread in producer which is responsible for turning RecordBatch
into requests (<broker node,List(ProducerBatch)>
) and send to broker.
how are Records Saved? #
The details can be found from these two articles: Apache Kafka - Message Format and A Guide To The Kafka Protocol - Apache Kafka - Apache Software Foundation.
Here are some important properties in RecordBatch
are: batch_lenth
, compresstion_type
, CRC
, timestamp
and, of course, the List(Record)
.
Each Record
consists of length
, timestamp_delta
, key(byte)
, value(byte)
etc.
When look into the kafka topic data directory, you may find files like this:
00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000000035.log
00000000000000000035.index
00000000000000000035.timeindex
Kafka saves each partition as segments. When new record comes, it append to the active segment. If the segment’s size limit is reached, a new segment is created as becomes the active segment. Segments are named by the offset of its first record, so the segments’ names are incremental.
Furthermore, the segment divided into three kinds of file: log file, index file and timeindex file.
- The
log
file contains the actual data - The
index
file contains the record’s relative offset and its physical position in the log file. This makes the look up complexity for specific offset record toO(1)
. - The
timeindex
file contains the record’s relative offset and its timestamp.
How does Consumer pull messages? #
Consumer keeps reading data from broker, and decompress data if necessary. It will put data into a internal queue and return the target number of records to client.
max.poll.records
(default values is 500) means the maximum number of records returned in a single call to poll().
fetch.min.bytes
(default value is 1) means the minimum amount of data the broker should return from a fetch request. If insufficient data is available, the server will wait up to fetch.max.wait.ms
ms and accumulate the data before answering the request.
How to Improve Performance #
Increase Socket Buffer #
The default socket buffer value in Java client is too small for high-throughput environment.
socket.receive.buffer.bytes
(default value is 64KB) and send.buffer.bytes
(default value is 128KB) is the SO_RCVBUFF
and SO_SNDBUFF
for socket connections respectively. I recommend to set it to a bigger value or -1
to use the OS default value.
batch.size, linger.ms and buffer.memory #
As mentioned before, producer always send message as RecordBatch
. Each batch should be smaller than batch.size
(default value is 16KB). Increasing batch.size
will not only reduce the TCP request to broker, but also lead to better compression ratio when compression is enabled.
linger.ms
is used to specific the wait time before sending RecordBatch
, and it will effect the real size of RecordBatch
indirectly. The producer groups together any records that arrive in between request transmissions into a single batched request. If the system load is low and the RecordBatch
is not full, the producer sender will still send this batch once it has been waited for linger.ms
. linger.ms
’s default value is 0, which means producer will send message as quick as possible(but the messaged arrived between two send requests will also be batched to RecordBatch
). Increasing this value not only makes real batch size be close to batch.size
and reducing the number of requests to be sent, but also increases the delay of messages.
The buffer.memory
(default value is 32MB) controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block.
Compression.type #
As the throughput keep growing, bandwidth may become bottleneck. It’s easy to tackle this by add compresstion.type
param in producer. Once it is configured, the producer will compressed the RecordBatch
before sending it to broker. If the records are texts, the compression ratio should be high and bandwidth usage will be significantly decreased.
There are two kind of compresstion.type
, topic level and producer level.
If you set compresstion.type
in producer, the producer will compress the records and send it to broker.
There is also a topic level compresstion.type
configuration. When it is set, producer’s compression type is not constrained. The broker will convert data sent from producer to target compresstion.type
. compresstion.type
can be set as gzip
, snappy
, lz4
, zstd
, uncompressed
, and producer
. The default value is producer
, which means the broker will keep the original data send from the producer.
How to choose compression type? According to cloudflare’s test result in Squeezing the firehose: getting the most from Kafka compression:
type | CPU ration | Compression ratio |
---|---|---|
None | 1x | 1x |
Gzip | 10.14x | 3.58x |
Snappy | 1.61x | 2.35x |
LZ4 | 2.51x | 1.81x |
Gzip
has best compression ratio but take lots of CPU time. Snappy
keeps a balance between the CPU time and space. The new compression type zstd
added in Kafka 2.1 produce larger compression ratio than Snappy
with the cost of a little more CPU time.
These are common configurations, you can find more from the official document contains such as max.in.flight.requests.per.connection
.
Ref #
- Kafka message format
- Kafka高性能探秘
- Exploit Apache Kafka’s Message Format to Save Storage and Bandwidth
- Consuming big messages from Kafka
- How does max.poll.records affect the consumer poll
- A Practical Introduction to Kafka Storage Internals
- Kafka message codec - compress and decompress
- 20 Best Practices for Working With Apache Kafka at Scale
- Kakfa Document
- kafka-python KafkaProducer
- Deep Dive Into Apache Kafka | Storage Internals