首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在单个产生命令中,可以产生到Kafka主题的记录的数量是否有限制

在单个产生命令中,可以产生到Kafka主题的记录的数量是否有限制
EN

Stack Overflow用户
提问于 2020-08-30 16:54:29
回答 1查看 355关注 0票数 1

我有一个Databricks Kafka生产者,需要写6200万条记录到一个Kafka主题。如果我同时写6200万条记录会不会有问题?或者我需要迭代20次,每次迭代要写3M条记录。

下面是代码。

代码语言:javascript
复制
Cmd1 val srcDf = spark.read.format("delta").load("/mnt/data-lake/data/silver/geocodes").filter($"LastUpdateDt"===lastUpdateDt)

Cmd2 val strDf = srcDf
        .withColumn("key",...
        .withColumn("topLevelRecord",...

Cmd3 strDf
 .select(
 to_avro($"key", lit("topic-AVRO-key"), schemaRegistryAddr).as("key"),
 to_avro($"topLevelRecord", lit("topic-AVRO-value"), schemaRegistryAddr, avroSchema).as("value"))
 .write
 .format("kafka")
 .option("kafka.bootstrap.servers", bootstrapServers)
 .option("kafka.security.protocol", "SSL")
 .option("kafka.ssl.keystore.location", kafkaKeystoreLocation)
 .option("kafka.ssl.keystore.password", keystorePassword)
 .option("kafka.ssl.truststore.location", kafkaTruststoreLocation)
 .option("topic",topic)
 .save()

我的问题是-如果strDf.count是62M,我能直接把它写到Kafka吗,或者我需要迭代cmd# 3。

EN

回答 1

Stack Overflow用户

发布于 2020-08-31 04:43:09

使用Spark structured streaming for Kafka将数据存储到Kafka中没有限制。您将在下面看到,您的流查询将创建一个(池) KafkaProducer,它用于迭代Dataframe中的行。Kafka可以处理这样的消息量,并且没有限制。

有趣的是,Kafka会在这批消息实际写入代理之前将一些消息缓冲到批处理中。这将指导KafkaProducer Configs linger.msbatch.sizemax.request.size的配置,因此根据您的整体设置调整这些设置可能会很有用。

下面是spark-kafka-sql库的代码:

在内部,Spark将在InternalKafkaProducerPool.scala中创建一个KafkaProducers池

代码语言:javascript
复制
  private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
    val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
    if (log.isDebugEnabled()) {
      val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
      logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
    }
    kafkaProducer
  }

然后将查询转换为RDD,并针对每个分区遍历KafkaWriter.scala中的元素

代码语言:javascript
复制
  queryExecution.toRdd.foreachPartition { iter =>
      val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
      Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
        finallyBlock = writeTask.close())
    }
  }

数据的实际生成将在KafkaWriteTask中进行

代码语言:javascript
复制
  def execute(iterator: Iterator[InternalRow]): Unit = {
    producer = Some(InternalKafkaProducerPool.acquire(producerConfiguration))
    val internalProducer = producer.get.producer
    while (iterator.hasNext && failedWrite == null) {
      val currentRow = iterator.next()
      sendRow(currentRow, internalProducer)
    }
  }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63655682

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档