首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有没有一种方法可以根据大小来冲洗卡夫卡流WindowStore?

有没有一种方法可以根据大小来冲洗卡夫卡流WindowStore?
EN

Stack Overflow用户
提问于 2019-01-07 18:40:50
回答 1查看 922关注 0票数 0

我正在使用DSL编写Kafka应用程序,它将从kafka主题中读取元组。在拓扑结构中,我想对元组进行批处理。然后,如果(1) 30秒过去了,或者(2)批处理的大小超过1GB,我想将浴缸写入磁盘上的文件。

我编写的拓扑使用TimeWindowedKStream对元组进行分组。然后调用聚合并传递一个窗口存储。

我的问题是,当州立商店试图写信给卡夫卡ChangeLog时,我得到了一个

org.apache.kafka.common.errors.RecordTooLargeException

异常。

特别是:

由: org.apache.kafka.streams.errors.StreamsException:任务ibv2-capt-consumer-group-3-record-store-changelog中止发送,因为以前记录捕获的错误(键\x00\x01$\xE7\x88\x00\x00值[B@419761c时间戳1546807396524)由于org.apache.kafka.common.errors.RecordTooLargeException:导致的错误包含了一条大于服务器将接受的最大消息大小的消息。

我已经尝试将CACHE_MAX_BYTES_BUFFERING_CONFIG设置为1MB,但是正如文档所述,对于整个拓扑,如果是这个配置。

这是我的拓扑

这是我一直在使用的Scala代码。注意,我这里使用的是kafka-streams-scala。

代码语言:javascript
复制
val builder = new StreamsBuilderS()

import com.lightbend.kafka.scala.streams.DefaultSerdes._

implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]

val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)

val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10)) 

val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))

import org.apache.kafka.common.utils.Bytes

val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
  .as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
  .withKeySerde(integerSerde)
  .withValueSerde(recordSeqSerde)
  .withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava)  // increased max.request.size to 10 x default

val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
  () => RecordSeq(Seq()),
  (randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
  windowedStore
)

val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")

recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))

注:案例类RecordSeq(记录: SeqRecord)

EN

回答 1

Stack Overflow用户

发布于 2019-01-07 19:33:28

主题可以具有message.max.bytes属性中定义的最大大小的记录。这是代理可以在主题中接收和追加的最大规模的消息。你的记录可能会超过这个限制。因此,您需要更改此属性的配置,以允许更大的记录大小。

它可以设置在代理级别,也可以设置为主题级别。您可以在这里参考更多详细信息:

http://kafka.apache.org/documentation/#brokerconfigs

http://kafka.apache.org/documentation/#topicconfigs

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54079998

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档