Kafka发送超过1M消息

释放双眼,带上耳机,听听看~!
🤖 由 ChatGPT 生成的文章摘要

在Kafka中,默认消息限制为1M,超过1M后Kafka会提示The request included a message larger than the max message size the server will accept.此时需要修改Kafka Server配置文件,或者手动调整Topic 消息接收大小限制。请根据业务大小及时调整,不易限制过大

环境演示

创建测试topic

./kafka-topics.sh --create --topic kafka-message-test --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --partitions 3 --replication-factor 3 

生产测试数据,验证topic是否正常

./kafka-console-producer.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --topic kafka-message-test 

消费测试数据

./kafka-console-consumer.sh --bootstrap-server  zook01:9092,zook02:9092,zook03:9092 --topic kafka-message-test --from-beginning 

测试发送 100 条大小为 2MB 的消息,其中CTL来模拟生产者,生产者配置文件设置10M

这里我们模拟生产者,需要提前修改/opt/kafka/config/producer.properties文件

[root@zook01 ~]# cat /opt/kafka/config/producer.properties |grep max
max.request.size=10485880

这里可以看到,实际上大于1M的消息已经无法进行写入了,超过Broker Topic限制

[root@zook01 bin]# ./kafka-producer-perf-test.sh --topic kafka_test  --record-size 1200000  --throughput -1 --producer-props bootstrap.servers=zook01:9092,zook02:9092,zook03:9092   --num-records 1 --producer.config /opt/kafka/config/producer.properties

org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
0 records sent, 0.000000 records/sec (0.00 MB/sec), NaN ms avg latency, 0.00 ms max latency, 0 ms 50th, 0 ms 95th, 0 ms 99th, 0 ms 99.9th.

修改Kafka Server Topic消息大小

在 broker 端有两种修改最大消息大小的方式:

  • 1.message.max.bytes
    静态参数在 broker 级别生效,影响所有的 topic,需要修改 server.properties 文件,并重启 Kafka 集群。

  • 2.max.message.bytes
    动态参数在 topic 级别生效,只影响指定的 topic,修改后立即生效,无需重启 Kafka 集群。

静态参数

静态修改为全局topic生效,动态修改为单独某个 topic生效,并且无需重启kafka

$  vim server.properties

replica.fetch.max.bytes=10485880
message.max.bytes=10485880

#修改为10m大小限制,并且重启Kafka集群
  • replica.fetch.max.bytes 默认为1MB,Broker可复制的消息最大字节数,这个值最好比message.max.bytes大,否则Broker会接收此消息,但无法将消息复制出去,从而导致数据丢失
  • message.max.bytes 允许Broker上topic接收最大消息,默认为1Mb

动态参数

创建Topic中修改
在创建 topic 的时候指定动态配置参数,例如创建一个名叫 kafka-message-01的 topic,指定 max.message.bytes为 10MB。

./kafka-topics.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 \
--create --topic  kafka-message-01 \
--config max.message.bytes=10485880

消费测试数据测试

发送1条2M的文件,进行测试

[root@zook01 bin]# ./kafka-producer-perf-test.sh --topic kafka-message-01  --record-size 2000000  --throughput -1 --producer-props bootstrap.servers=zook01:9092,zook02:9092,zook03:9092   --num-records 1 --producer.config /opt/kafka/config/producer.properties
1 records sent, 1.394700 records/sec (2.66 MB/sec), 677.00 ms avg latency, 677.00 ms max latency, 677 ms 50th, 677 ms 95th, 677 ms 99th, 677 ms 99.9th.
[root@zook01 bin]#

可以看到消息已经正常发送了,大小为2M,并且我们Kafka配置文件还是设置的默认1M

[root@zook01 bin]# cat /opt/app/conf/kafka/server.properties |grep max
replica.fetch.max.bytes=1000000
message.max.bytes=1000000

#默认1M

查看topic 的动态发送消息配置参数

#默认限制大小,就没有max相关信息
[root@zook01 bin]# ./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --entity-type topics --entity-name kafka_test --describe
Dynamic configs for topic kafka_test are:

#创建topic修改  max.message.bytes
[root@zook01 bin]# ./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --entity-type topics --entity-name kafka-message-01 --describe
Dynamic configs for topic kafka-message-01 are:
  max.message.bytes=10485880 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10485880, STATIC_BROKER_CONFIG:message.max.bytes=1000000, DEFAULT_CONFIG:message.max.bytes=1048588}

DYNAMIC_TOPIC_CONFIG 是此时生效的配置 10MB,DEFAULT_CONFIG 是默认配置 1M。
max.message.bytes=10485880 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10485880, DEFAULT_CONFIG:message.max.bytes=10485880}

也可以在已创建的 topic 上修改该配置参数。

这里我们使用kafka_test进行修改,这个topic刚刚发送消息是失败的情况。同样将max.message.bytes修改为10M

./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092  --alter --entity-type topics  --entity-name kafka_test --add-config max.message.bytes=10485880

在次查看已经生效

[root@zook01 bin]# ./kafka-configs.sh --bootstrap-server zook01:9092,zook02:9092,zook03:9092 --entity-type topics --entity-name kafka_test --describe
Dynamic configs for topic kafka_test are:
  max.message.bytes=10485880 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10485880, STATIC_BROKER_CONFIG:message.max.bytes=1000000, DEFAULT_CONFIG:message.max.bytes=1048588}
[root@zook01 bin]#

发送消息测试,验证成功

[root@zook01 bin]# ./kafka-producer-perf-test.sh --topic kafka_test  --record-size 1200000  --throughput -1 --producer-props bootstrap.servers=zook01:9092,zook02:9092,zook03:9092   --num-records 1 --producer.config /opt/kafka/config/producer.properties
1 records sent, 1.474926 records/sec (1.69 MB/sec), 641.00 ms avg latency, 641.00 ms max latency, 641 ms 50th, 641 ms 95th, 641 ms 99th, 641 ms 99.9th.
[root@zook01 bin]#

给TA打赏
共{{data.count}}人
人已打赏
Kafka

Kafka 自带脚本压力测试工具

2024-12-25 17:55:53

CephGrafanaKubernetesprometheus

Prometheus Grafana使用Ceph持久化并监控k8s集群

2022-6-26 0:22:02

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索