How to Send Large Messages with Kafka (Over 15Mb)

How can I send large messages with Kafka (over 15MB)?

You need to adjust three (or four) properties:

  • Consumer side:fetch.message.max.bytes - this will determine the largest size of a message that can be fetched by the consumer.
  • Broker side: replica.fetch.max.bytes - this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
  • Broker side: message.max.bytes - this is the largest size of the message that can be received by the broker from a producer.
  • Broker side (per topic): max.message.bytes - this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker's message.max.bytes.)

I found out the hard way about number 2 - you don't get ANY exceptions, messages, or warnings from Kafka, so be sure to consider this when you are sending large messages.

Kafka - Broker: Message size too large

You have the right configuration however you need to also set max.request.size on the producer side.

props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15728640);

max.request.size The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send
in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size.

On the Broker side, you have already configured the below parameter that should work

message.max.bytes The largest record batch size allowed by Kafka.

replica.fetch.max.bytes The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum if the
first record batch in the first non-empty partition of the fetch is
larger than this value, the record batch will still be returned to
ensure that progress can be made. The maximum record batch size
accepted by the broker is defined via message.max.bytes (broker
config) or max.message.bytes (topic config).

On the topic side max.message.bytes which is not required in case you have already set message.max.bytes in the broker side

max.message.bytes - this is the largest size of the message the broker
will allow being appended to the topic. This size is validated
pre-compression. (Defaults to broker's message.max.bytes.)

Refrence
https://kafka.apache.org/documentation/

Handling Large Messages with Kafka

We need to set following configurations

Broker

replica.fetch.max.bytes: Changes to this property will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).

message.max.bytes: This is the largest size of the message that can be received by the broker from a producer.

Broker (topic)

max.message.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case (Defaults to broker's message.max.bytes).

Producer

max.request.size: The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.

compression.type: Set to snappy, this will increase the total amount of data which can sent with a single request and should be paired with a larger batch.size.

buffer.memory: If compression is enabled the buffer size should be raised as well.

batch.size: Batch size should be at least 10s of KB, diminishing returns can be seen at around 300kb(less for remote client). Larger batches result in a better compression ratio as well.

linger.ms: linger.ms preempts any bounds that were placed on batch size. Increase this value to ensure smaller batches are not sent during slower production times

Consumer

fetch.message.max.bytes: This will determine the largest size of a message that can be fetched by the consumer.

max.partition.fetch.bytes: The maximum amount of data per-partition the server will return.

How do I write a big message into kafka with kafka producer API?

You need to configure your topic appropriately when creating it:
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes

$ kafka-topics.sh --create --bootstrap-servers ... --config max.message.bytes=20971520

UPDATE:

maybe add some more properties, I've been pushing big base64 blobs with this:

    // Only one in-flight messages per Kafka broker connection
// - max.in.flight.requests.per.connection (default 5)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// Set the number of retries - retries
props.put(ProducerConfig.RETRIES_CONFIG, "3");

// Request timeout - request.timeout.ms
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");

// Only retry after one second.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");

// set max block to one minute by default
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");

// set transaction timeout to one minute
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");

// set delivery timeout to two minutes
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

//time to wait before sending messages out to Kafka, should not be too high
props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
// maximum amount of data to be collected before sending the batch, you will always hit that
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");

//those ones are not neccessary but useful for your usecase
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myClient");

How to consume large messages from kafka topic

May be the problem occurring because you are using console producer and copying the message to terminal (linux) but terminal truncate long message to a maximum fixed length.

You can try using echo | xargs --show-limits or other shell or term settings to find out.

It can also come from the operating system, for example ARG_MAX:

getconf ARG_MAX

Can be too small for your message.

The easiest way would be writting the file directly to kafka-console-producer, like that example:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

If it works correctly it means that this was indeed the issue.


For the record, these settings should also be tested:

  • Consumer side:fetch.message.max.bytes - this will determine the largest size of a message that can be fetched by the consumer.
  • Broker side: replica.fetch.max.bytes - this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
  • Broker side: message.max.bytes - this is the largest size of the message that can be received by the broker from a producer.
  • Broker side (per topic): max.message.bytes - this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker's message.max.bytes.)

Kafka message size with activated compression

I found the solution:

Problem is kafka-console-producer.sh is ignoring the compression.type in producer config. If i explicit call

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt

with compression.codec=zstd it works because the producer compressed the message.



Related Topics



Leave a reply



Submit