我正在使用DSL编写Kafka应用程序,它将从kafka主题中读取元组。在拓扑结构中,我想对元组进行批处理。然后,如果(1) 30秒过去了,或者(2)批处理的大小超过1GB,我想将浴缸写入磁盘上的文件。
我编写的拓扑使用TimeWindowedKStream对元组进行分组。然后调用聚合并传递一个窗口存储。
我的问题是,当州立商店试图写信给卡夫卡ChangeLog时,我得到了一个
org.apache.kafka.common.errors.RecordTooLargeException
异常。
特别是:
由: org.apache.kafka.streams.errors.StreamsException:任务ibv2-capt-consumer-group-3-record-store-changelog中止发送,因为以前记录捕获的错误(键\x00\x01$\xE7\x88\x00\x00值[B@419761c时间戳1546807396524)由于org.apache.kafka.common.errors.RecordTooLargeException:导致的错误包含了一条大于服务器将接受的最大消息大小的消息。
我已经尝试将CACHE_MAX_BYTES_BUFFERING_CONFIG设置为1MB,但是正如文档所述,对于整个拓扑,如果是这个配置。
这是我的拓扑
这是我一直在使用的Scala代码。注意,我这里使用的是kafka-streams-scala。
val builder = new StreamsBuilderS()
import com.lightbend.kafka.scala.streams.DefaultSerdes._
implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]
val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)
val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10))
val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))
import org.apache.kafka.common.utils.Bytes
val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
.as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
.withKeySerde(integerSerde)
.withValueSerde(recordSeqSerde)
.withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava) // increased max.request.size to 10 x default
val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
() => RecordSeq(Seq()),
(randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
windowedStore
)
val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")
recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))注:案例类RecordSeq(记录: SeqRecord)
发布于 2019-01-07 19:33:28
主题可以具有message.max.bytes属性中定义的最大大小的记录。这是代理可以在主题中接收和追加的最大规模的消息。你的记录可能会超过这个限制。因此,您需要更改此属性的配置,以允许更大的记录大小。
它可以设置在代理级别,也可以设置为主题级别。您可以在这里参考更多详细信息:
http://kafka.apache.org/documentation/#brokerconfigs
http://kafka.apache.org/documentation/#topicconfigs
https://stackoverflow.com/questions/54079998
复制相似问题