我有一个Databricks Kafka生产者,需要写6200万条记录到一个Kafka主题。如果我同时写6200万条记录会不会有问题?或者我需要迭代20次,每次迭代要写3M条记录。
下面是代码。
Cmd1 val srcDf = spark.read.format("delta").load("/mnt/data-lake/data/silver/geocodes").filter($"LastUpdateDt"===lastUpdateDt)
Cmd2 val strDf = srcDf
.withColumn("key",...
.withColumn("topLevelRecord",...
Cmd3 strDf
.select(
to_avro($"key", lit("topic-AVRO-key"), schemaRegistryAddr).as("key"),
to_avro($"topLevelRecord", lit("topic-AVRO-value"), schemaRegistryAddr, avroSchema).as("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.keystore.location", kafkaKeystoreLocation)
.option("kafka.ssl.keystore.password", keystorePassword)
.option("kafka.ssl.truststore.location", kafkaTruststoreLocation)
.option("topic",topic)
.save()我的问题是-如果strDf.count是62M,我能直接把它写到Kafka吗,或者我需要迭代cmd# 3。
发布于 2020-08-31 04:43:09
使用Spark structured streaming for Kafka将数据存储到Kafka中没有限制。您将在下面看到,您的流查询将创建一个(池) KafkaProducer,它用于迭代Dataframe中的行。Kafka可以处理这样的消息量,并且没有限制。
有趣的是,Kafka会在这批消息实际写入代理之前将一些消息缓冲到批处理中。这将指导KafkaProducer Configs linger.ms、batch.size和max.request.size的配置,因此根据您的整体设置调整这些设置可能会很有用。
下面是spark-kafka-sql库的代码:
在内部,Spark将在InternalKafkaProducerPool.scala中创建一个KafkaProducers池
private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
if (log.isDebugEnabled()) {
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
}
kafkaProducer
}然后将查询转换为RDD,并针对每个分区遍历KafkaWriter.scala中的元素
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
}数据的实际生成将在KafkaWriteTask中进行
def execute(iterator: Iterator[InternalRow]): Unit = {
producer = Some(InternalKafkaProducerPool.acquire(producerConfiguration))
val internalProducer = producer.get.producer
while (iterator.hasNext && failedWrite == null) {
val currentRow = iterator.next()
sendRow(currentRow, internalProducer)
}
}https://stackoverflow.com/questions/63655682
复制相似问题