我目前正在工作的设置
NiFi流Avro消息(合流模式注册表引用)到Kafka(v2.0.0,20个分区,合流v5.0.0),Kafka工作者(HDFS接收器)以flush.size=70000格式将这些消息流到HDFS。
我的问题
此配置工作正常,但当我将配置更改为flush.size=1000000 (因为70k消息大小最大为5-7 Mb,但Parquet文件块大小为256 Mb)时,connect工作者返回Error sending fetch request错误:
...
[2019-05-24 14:00:21,784] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1661483807, epoch=374) to node 3: java.io.IOException: Connection to 3 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-05-24 14:00:21,784] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={mytopic-10=(offset=27647797, logStartOffset=24913298, maxBytes=1048576), mytopic-16=(offset=27647472, logStartOffset=24913295, maxBytes=1048576), mytopic-7=(offset=27647429, logStartOffset=24913298, maxBytes=1048576), mytopic-4=(offset=27646967, logStartOffset=24913296, maxBytes=1048576), mytopic-13=(offset=27646404, logStartOffset=24913298, maxBytes=1048576), mytopic-19=(offset=27648276, logStartOffset=24913300, maxBytes=1048576), mytopic-1=(offset=27647036, logStartOffset=24913307, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1661483807, epoch=374)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
...我的心声:
HDFS连接器配置:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=mytopic
hdfs.url=hdfs://hdfsnode:8020/user/someuser/kafka_hdfs_sink/
flush.size=1000000Kafka Connect Worker配置:
bootstrap.servers=confleuntnode1:9092,confleuntnode2:9092,confleuntnode3:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://confleuntnode:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/confluent/current/share/java/我的问题:
如何使用Kafka工作人员将更大的消息从Kafka传输到HDFS?
发布于 2019-06-04 12:25:07
我通过在分布式模式下运行connect (而不是独立的)来解决这个问题。现在,我可以写到HDFS多达350万条记录(~256 mb)。但也有一个新的问题: 1)处理速度非常慢(1小时内记录3500万条);2)无法写入大于256 Mb的拼花文件。我会发布一个新的问题。
https://stackoverflow.com/questions/56321170
复制相似问题