How can I send large messages with Kafka (over 15MB)?
You need to adjust three (or four) properties:
- Consumer side:
fetch.message.max.bytes
- this will determine the largest size of a message that can be fetched by the consumer. - Broker side:
replica.fetch.max.bytes
- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated). - Broker side:
message.max.bytes
- this is the largest size of the message that can be received by the broker from a producer. - Broker side (per topic):
max.message.bytes
- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes
.)
I found out the hard way about number 2 - you don't get ANY exceptions, messages, or warnings from Kafka, so be sure to consider this when you are sending large messages.
Kafka - Broker: Message size too large
You have the right configuration however you need to also set max.request.size on the producer side.
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15728640);
max.request.size The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send
in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size.
On the Broker side, you have already configured the below parameter that should work
message.max.bytes The largest record batch size allowed by Kafka.
replica.fetch.max.bytes The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum if the
first record batch in the first non-empty partition of the fetch is
larger than this value, the record batch will still be returned to
ensure that progress can be made. The maximum record batch size
accepted by the broker is defined via message.max.bytes (broker
config) or max.message.bytes (topic config).
On the topic side max.message.bytes which is not required in case you have already set message.max.bytes in the broker side
max.message.bytes - this is the largest size of the message the broker
will allow being appended to the topic. This size is validated
pre-compression. (Defaults to broker's message.max.bytes.)
Refrence
https://kafka.apache.org/documentation/
Handling Large Messages with Kafka
We need to set following configurations
Broker
replica.fetch.max.bytes: Changes to this property will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
message.max.bytes: This is the largest size of the message that can be received by the broker from a producer.
Broker (topic)
max.message.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case (Defaults to broker's message.max.bytes).
Producer
max.request.size: The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.
compression.type: Set to snappy, this will increase the total amount of data which can sent with a single request and should be paired with a larger batch.size.
buffer.memory: If compression is enabled the buffer size should be raised as well.
batch.size: Batch size should be at least 10s of KB, diminishing returns can be seen at around 300kb(less for remote client). Larger batches result in a better compression ratio as well.
linger.ms: linger.ms preempts any bounds that were placed on batch size. Increase this value to ensure smaller batches are not sent during slower production times
Consumer
fetch.message.max.bytes: This will determine the largest size of a message that can be fetched by the consumer.
max.partition.fetch.bytes: The maximum amount of data per-partition the server will return.
How do I write a big message into kafka with kafka producer API?
You need to configure your topic appropriately when creating it:
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes
$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520
UPDATE:
maybe add some more properties, I've been pushing big base64 blobs with this:
// Only one in-flight messages per Kafka broker connection
// - max.in.flight.requests.per.connection (default 5)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// Set the number of retries - retries
props.put(ProducerConfig.RETRIES_CONFIG, "3");
// Request timeout - request.timeout.ms
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
// Only retry after one second.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
// set max block to one minute by default
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
// set transaction timeout to one minute
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
// set delivery timeout to two minutes
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
//time to wait before sending messages out to Kafka, should not be too high
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// maximum amount of data to be collected before sending the batch, you will always hit that
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
//those ones are not neccessary but useful for your usecase
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");
How to consume large messages from kafka topic
May be the problem occurring because you are using console producer and copying the message to terminal (linux) but terminal truncate long message to a maximum fixed length.
You can try using echo | xargs --show-limits
or other shell or term settings to find out.
It can also come from the operating system, for example ARG_MAX:
getconf ARG_MAX
Can be too small for your message.
The easiest way would be writting the file directly to kafka-console-producer, like that example:
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt
If it works correctly it means that this was indeed the issue.
For the record, these settings should also be tested:
- Consumer side:
fetch.message.max.bytes
- this will determine the largest size of a message that can be fetched by the consumer. - Broker side:
replica.fetch.max.bytes
- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated). - Broker side:
message.max.bytes
- this is the largest size of the message that can be received by the broker from a producer. - Broker side (per topic):
max.message.bytes
- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes
.)
Kafka message size with activated compression
I found the solution:
Problem is kafka-console-producer.sh is ignoring the compression.type in producer config. If i explicit call
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt
with compression.codec=zstd it works because the producer compressed the message.
Related Topics
Java Wait Cursor Display Problem
The JPA Hashcode()/Equals() Dilemma
Should a Retrieval Method Return 'Null' or Throw an Exception When It Can't Produce the Return Value
Difference Between Thread's Context Class Loader and Normal Classloader
What Does the Java Assert Keyword Do, and When Should It Be Used
How Are Spring Data Repositories Actually Implemented
Why Do People Still Use Primitive Types in Java
How to Search Google Programmatically Java API
Converting Java Objects to JSON with Jackson
Jquery, Spring MVC @Requestbody and JSON - Making It Work Together
How to Specify Jackson to Only Use Fields - Preferably Globally
Can Overridden Methods Differ in Return Type
Java String Split with "." (Dot)