博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
sparkstreaming对接kafka出现的数据积压问题
阅读量:4217 次
发布时间:2019-05-26

本文共 5772 字,大约阅读时间需要 19 分钟。

kafka数据积压问题

1. 问题描述 

生产环境开发了一套sparkstreaming对接kafka,并进行数据处理的程序。刚开始程序运行的很好,kafka集群被人动过之后,重启spark程序时出现如下报警信息:

18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-ymtopic18/06/20 15:29:21 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-313518/06/20 15:34:00 WARN internals.ConsumerCoordinator: Auto-commit of offsets {iptv_js-10=OffsetAndMetadata{offset=915889, metadata=''}, iptv_js-9=OffsetAndMetadata{offset=1018618, metadata=''}, iptv_js-11=OffsetAndMetadata{offset=1018619, metadata=''}, iptv_js-0=OffsetAndMetadata{offset=915887, metadata=''}, iptv_js-2=OffsetAndMetadata{offset=915888, metadata=''}, iptv_js-1=OffsetAndMetadata{offset=1018616, metadata=''}, iptv_js-4=OffsetAndMetadata{offset=915883, metadata=''}, iptv_js-3=OffsetAndMetadata{offset=1018619, metadata=''}, iptv_js-6=OffsetAndMetadata{offset=915887, metadata=''}, iptv_js-5=OffsetAndMetadata{offset=1018618, metadata=''}, iptv_js-8=OffsetAndMetadata{offset=915887, metadata=''}, iptv_js-7=OffsetAndMetadata{offset=1018621, metadata=''}} failed for group ymtopic: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

[warn information] 

Auto-commit of offsets {…} failed for group xxxx: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
[google translate] 
归属于xxxx group的offsets自动提交失败了,commit未能完成的原因是group已经rebalanced并将partitions重新分配给了其他成员。这意味着后续连续调用poll()方法的时间间隔大于设置的max.poll.interval.ms的值,这通常的因为poll()方法处理数据的时间过长。你可以通过增大会话时间(max.poll.interval.ms),或者减小poll()方法处理的最大记录条数(max.poll.records)来修复这个问题。



根据错误提示,我们可以通过增大max.poll.interval.ms或者减少max.poll.records来解决这个问题。从逻辑上来说这固然没错,但是这并不是解决问题的根本方法。因为造成这个问题直接原因是poll()方法处理数据时间过长,根本原因是kafka数据积压。而kafka数据积压的根本原因是我们程序指定的kafka的offset被覆盖,报警信息见上面的4个overriding,源码如下图所示:

/**   * Tweak kafka params to prevent issues on executors   */  private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")    // driver and executor should be in different consumer groups    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)    if (null == originalGroupId) {      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")    }    val groupId = "spark-executor-" + originalGroupId    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)    // possible workaround for KAFKA-3135    val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)    if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")      kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)    }  }

overriding auto.offset.reset to none for executor. 

none: 
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

根据报警信息,之前程序自动存储的offset大概在915883~1018619之间。这说明kafka集群被动过之后,又生产了很多数据,导致sparkstreaming从上次存储的offset开始消费不过来了。

查看spark源码如下图: 

设置kafkaçoffset

 

从源码可知,如果我们自己在Subscribe里设置一个offsets变量,就能跳过auto.offset.reset参数被覆盖带来的影响。

2.解决办法

程序启动不起来的根本原因是kafka的数据积压问题。那么我们手动维护一个offset变量,就可以跳过数据积压的问题了[这只适用于允许数据丢失的业务]

生产代码如下:

/*      使用sparkstreaming消费kafka数据     */    // 本地策略,当且仅当Executors分布在kafka节点上时使用“PreferBrokers”,这里使用PreferConsistent    val strategies = LocationStrategies.PreferConsistent    // 订阅,指定消费kafka的相关参数以及topic    val topics = Array(KAFKA_Input_TOPIC)    val kafkaParams = collection.Map[String, Object](      "bootstrap.servers" -> KAFKA_IPS,      "key.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],      "value.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],      "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],      "value.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],      "group.id" -> KAFKA_Group_ID,      "auto.offset.reset" -> "latest",      "max.poll.interval.ms" -> KAFKA_MAX_POLL_INTERVAL_MS,      "max.poll.records" -> KAFKA_MAX_POLL_RECORDS,      "enable.auto.commit" -> (false: java.lang.Boolean)    )    // 配置kafka的偏移量    val offsets = collection.Map[TopicPartition, Long] {      new TopicPartition(KAFKA_Input_TOPIC, KAFKA_NUM_PARTITION.toInt) -> KAFKA_NOW_OFFSET    }    val subscribe = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)    // 创建消费流    val stream = KafkaUtils.createDirectStream(ssc, strategies, subscribe)

转:

你可能感兴趣的文章
如何使用EasyRecovery巧妙恢复被误删的办公文档?
查看>>
EasyRecovery,拯救那些遗失的文件
查看>>
EasyRecovery---视频文件恢复技巧
查看>>
剪视频一点都不难,多款超实用剪辑软件全方位评测!
查看>>
简单好用一键恢复丢失办公文档
查看>>
EasyRecovery软件在win10系统中的功能及使用方法介绍(附注册机下载地址)
查看>>
如何用EasyRecovery找回回收站信息(附注册机下载地址)
查看>>
如何用EasyRecovery找回删除的文档(附注册机下载地址)
查看>>
EasyRecovery——一款专业的数据恢复软件
查看>>
除了数据恢复,EasyRecovery还有这样的功能!
查看>>
2021-06-30
查看>>
如何用EasyRecovery找回已经删除的图片?
查看>>
想要做音乐玩音乐,有这一个软件就够了!
查看>>
制作属于自己的片头动画,打造个人标签!
查看>>
idea中maven依赖包识别不了的问题
查看>>
番茄工作法(简单易行的时间管理方法)
查看>>
HTTP/1.0、HTTP/1.1、HTTP/2、HTTP/3的区别
查看>>
一次kafka集群重启引发的线上问题
查看>>
MySQL 索引
查看>>
让你的java业务代码并发的调用,并正确的处理返回结果
查看>>