文章目录
在Kafka中,默认消息限制为1M,超过1M后Kafka会提示The request included a message larger than the max message size the server will accept.此时需要修改Kafka Server配置文件,或者手动调整Topic 消息接收大小限制。请根据业务大小及时调整,不易限制过大
环境演示
创建测试topic
./kafka-topics.sh --create --topic kafka-message-test --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --partitions 3 --replication-factor 3
生产测试数据,验证topic是否正常
./kafka-console-producer.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --topic kafka-message-test
消费测试数据
./kafka-console-consumer.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --topic kafka-message-test --from-beginning
测试发送 100 条大小为 2MB 的消息,其中CTL来模拟生产者,生产者配置文件设置10M
这里我们模拟生产者,需要提前修改/opt/kafka/config/producer.properties
文件
[root@zook01 ~]# cat /opt/kafka/config/producer.properties |grep max
max.request.size=10485880
这里可以看到,实际上大于1M的消息已经无法进行写入了,超过Broker Topic限制
[root@zook01 bin]# ./kafka-producer-perf-test.sh --topic kafka_test --record-size 1200000 --throughput -1 --producer-props bootstrap.servers=zook01:9092,zook02:9092,zook03:9092 --num-records 1 --producer.config /opt/kafka/config/producer.properties
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
0 records sent, 0.000000 records/sec (0.00 MB/sec), NaN ms avg latency, 0.00 ms max latency, 0 ms 50th, 0 ms 95th, 0 ms 99th, 0 ms 99.9th.
修改Kafka Server Topic消息大小
在 broker 端有两种修改最大消息大小的方式:
-
1.message.max.bytes
静态参数在 broker 级别生效,影响所有的 topic,需要修改 server.properties 文件,并重启 Kafka 集群。 -
2.max.message.bytes
动态参数在 topic 级别生效,只影响指定的 topic,修改后立即生效,无需重启 Kafka 集群。
静态参数
静态修改为全局topic生效,动态修改为单独某个 topic生效,并且无需重启kafka
$ vim server.properties
replica.fetch.max.bytes=10485880
message.max.bytes=10485880
#修改为10m大小限制,并且重启Kafka集群
- replica.fetch.max.bytes 默认为1MB,Broker可复制的消息最大字节数,这个值最好比message.max.bytes大,否则Broker会接收此消息,但无法将消息复制出去,从而导致数据丢失
- message.max.bytes 允许Broker上topic接收最大消息,默认为1Mb
动态参数
创建Topic中修改
在创建 topic 的时候指定动态配置参数,例如创建一个名叫 kafka-message-01的 topic,指定 max.message.bytes
为 10MB。
./kafka-topics.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 \
--create --topic kafka-message-01 \
--config max.message.bytes=10485880
消费测试数据测试
发送1条2M的文件,进行测试
[root@zook01 bin]# ./kafka-producer-perf-test.sh --topic kafka-message-01 --record-size 2000000 --throughput -1 --producer-props bootstrap.servers=zook01:9092,zook02:9092,zook03:9092 --num-records 1 --producer.config /opt/kafka/config/producer.properties
1 records sent, 1.394700 records/sec (2.66 MB/sec), 677.00 ms avg latency, 677.00 ms max latency, 677 ms 50th, 677 ms 95th, 677 ms 99th, 677 ms 99.9th.
[root@zook01 bin]#
可以看到消息已经正常发送了,大小为2M,并且我们Kafka配置文件还是设置的默认1M
[root@zook01 bin]# cat /opt/app/conf/kafka/server.properties |grep max
replica.fetch.max.bytes=1000000
message.max.bytes=1000000
#默认1M
查看topic 的动态发送消息配置参数
#默认限制大小,就没有max相关信息
[root@zook01 bin]# ./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --entity-type topics --entity-name kafka_test --describe
Dynamic configs for topic kafka_test are:
#创建topic修改 max.message.bytes
[root@zook01 bin]# ./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --entity-type topics --entity-name kafka-message-01 --describe
Dynamic configs for topic kafka-message-01 are:
max.message.bytes=10485880 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10485880, STATIC_BROKER_CONFIG:message.max.bytes=1000000, DEFAULT_CONFIG:message.max.bytes=1048588}
DYNAMIC_TOPIC_CONFIG 是此时生效的配置 10MB,DEFAULT_CONFIG 是默认配置 1M。
max.message.bytes=10485880 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10485880, DEFAULT_CONFIG:message.max.bytes=10485880}
也可以在已创建的 topic 上修改该配置参数。
这里我们使用kafka_test进行修改,这个topic刚刚发送消息是失败的情况。同样将max.message.bytes
修改为10M
./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --alter --entity-type topics --entity-name kafka_test --add-config max.message.bytes=10485880
在次查看已经生效
[root@zook01 bin]# ./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --entity-type topics --entity-name kafka_test --describe
Dynamic configs for topic kafka_test are:
max.message.bytes=10485880 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10485880, STATIC_BROKER_CONFIG:message.max.bytes=1000000, DEFAULT_CONFIG:message.max.bytes=1048588}
[root@zook01 bin]#
发送消息测试,验证成功
[root@zook01 bin]# ./kafka-producer-perf-test.sh --topic kafka_test --record-size 1200000 --throughput -1 --producer-props bootstrap.servers=zook01:9092,zook02:9092,zook03:9092 --num-records 1 --producer.config /opt/kafka/config/producer.properties
1 records sent, 1.474926 records/sec (1.69 MB/sec), 641.00 ms avg latency, 641.00 ms max latency, 641 ms 50th, 641 ms 95th, 641 ms 99th, 641 ms 99.9th.
[root@zook01 bin]#