首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在卡夫卡星火流的情况下,spark.streaming.kafka.maxRatePerPartition和spark.streaming.backpressure.enabled有什么关系?

在卡夫卡星火流的情况下,spark.streaming.kafka.maxRatePerPartition和spark.streaming.backpressure.enabled有什么关系?
EN

Stack Overflow用户
提问于 2021-09-13 12:18:55
回答 1查看 972关注 0票数 4

我正在尝试将数据写入一个卡夫卡主题后,阅读了一个蜂巢表如下。

代码语言:javascript
复制
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))

final_df.write.format("kafka")\
        .option("kafka.bootstrap.servers", kafka_broker)\
        .option("kafka.batch.size", 51200)\
        .option("retries", 3)\
        .option("kafka.max.request.size", 500000)\
        .option("kafka.max.block.ms", 120000)\
        .option("kafka.metadata.max.age.ms", 120000)\
        .option("kafka.request.timeout.ms", 120000)\
        .option("kafka.linger.ms", 0)\
        .option("kafka.delivery.timeout.ms", 130000)\
        .option("acks", "1")\
        .option("kafka.compression.type", "snappy")\
        .option("kafka.security.protocol", "SASL_SSL")\
        .option("kafka.sasl.jaas.config", oauth_config)\
        .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
        .option("kafka.sasl.mechanism", "OAUTHBEARER")\
        .option("topic", 'topic_name')\
        .save()

在成功写入(记录数为29000)之后,我将从另一个文件(read_kafka_data.py)中读取来自以下主题的数据:

代码语言:javascript
复制
    # SCHEMA
    schema = StructType([StructField("col1", StringType()),
            StructField("col2", IntegerType())
    ])

    # READ FROM TOPIC
    jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
                          + " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
                          + " oauth.client.id=" + '"' + "client_id" + '"' \
                          + " oauth.client.secret=" + '"' + "secret_key" + '" ;'

    stream_df = spark.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', kafka_broker) \
            .option('subscribe', 'topic_name') \
            .option('kafka.security.protocol', 'SASL_SSL') \
            .option('kafka.sasl.mechanism', 'OAUTHBEARER') \
            .option('kafka.sasl.jaas.config', jass_config) \
            .option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
            .option('startingOffsets', 'latest') \
            .option('group.id', 'group_id') \
            .option('maxOffsetsPerTrigger', 200) \
            .option('fetchOffset.retryIntervalMs', 200) \
            .option('fetchOffset.numRetries', 3) \
            .load()\
            .select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')

    stream_df.writeStream.outputMode('append')
    .format(HiveWarehouseSession.STREAM_TO_STREAM)
      .option("database", "database_name")
      .option("table", "table_name")
      .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
      .option("checkpointLocation", "/path/to/checkpoint/dir")
      .start().awaitTermination()

我是一个初学者卡夫卡,并一直阅读卡夫卡性能优化技术,并遇到这两个。

spark.streaming.backpressure.enabledspark.streaming.kafka.maxRatePerPartition

要启用第一个参数:

代码语言:javascript
复制
sparkConf.set("spark.streaming.backpressure.enabled",”true”)

正式文件对上述参数的解释如下:

启用或禁用火花流的内部背压机制(自1.5起)。这使得火花流能够根据当前批处理调度延迟和处理时间来控制接收速率,以便系统接收到的速度仅限于系统所能处理的速度。在内部,这动态地设置接收器的最大接收速率。此速率是由spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition值所限定的上限。

既然我是第一次运行应用程序,并且没有以前的微批处理,那么我应该为:spark.streaming.backpressure.initialRate指定一些值吗?

如果是这样,我应该如何确定spark.streaming.backpressure.initialRate的值。文档还指出,如果将spark.streaming.backpressure.enabled设置为true,则动态设置最大接收速率。如果是这样的话,如果spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition被设置为true,我们还需要配置:spark.streaming.backpressure.enabledtrue吗?

这个链接说,当施加背压时,使用spark.streaming.backpressure.initialRate没有任何影响。

任何帮助清理混乱将是非常感谢的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-22 20:54:15

您所指的配置spark.streaming.[...]属于直接流(也称为火花流),而不是属于结构化流。

如果您不知道这种差异,我建议您查看一下单独的编程指南:

结构化流不提供背压机制。当您从Kafka消费时,您可以使用(就像您已经在做的那样)选项maxOffsetsPerTrigger来对每个触发器上的读消息设置一个限制。此选项在结构化流与Kafka集成指南中记录为:

“每个触发间隔处理的最大偏移数的利率限制。指定的偏移总数将按比例在不同容量的topicPartitions之间分割。”

如果你还对标题问题感兴趣

在卡夫卡火花缭乱的情况下,spark.streaming.kafka.maxRatePerPartitionspark.streaming.backpressure.enabled有什么关系?

这种关系在关于火花结构的文档中得到了解释。

“启用或禁用火花流的内部背压机制(自1.5起)。这使火花流能够根据当前批处理调度延迟和处理时间来控制接收速率,以便系统只在系统能够处理的速度范围内接收。在内部,这会动态地设置接收者的最大接收速率。如果设置了spark.streaming.receiver.maxRate spark.streaming.kafka.maxRatePerPartition ,则该速率的上限是和(见下文)。”

关于火花流(DStream,而不是结构化流)中的背压机制的所有细节都在您已经链接过启用背压,使您的星火流应用程序生产准备就绪的博客中进行了解释。

通常,如果您启用背压,您将设置spark.streaming.kafka.maxRatePerPartition为最优估计速率的150% ~ 200%。

PID控制器的精确计算可以在类PIDRateEstimator中的代码中找到。

火花流背压实例

正如您要求的一个例子,下面是我在我的一个高效应用程序中所做的一个例子:

架设

  • Kafka主题有16个分区
  • 星火运行16个工作核心,因此每个分区可以并行使用。
  • 使用火花流(非结构化流)
  • 批处理间隔为10秒。
  • spark.streaming.backpressure.enabled设置为真
  • spark.streaming.kafka.maxRatePerPartition设置为10000
  • spark.streaming.backpressure.pid.minRate保持默认值为100
  • 该作业可以处理每个分区每秒大约5000条消息。
  • 在启动流作业之前,Kafka主题在每个分区中包含数百万条消息。

观察

  • 在第一批处理中,流作业获取16000条消息(= 10秒*16个分区*100个pid.minRate)。
  • 任务是处理这16000消息相当快,所以PID控制器估计一个比masRatePerPartition 10000更大的最优速率。
  • 因此,在第二批中,流作业获取16000条消息(= 10秒*16个分区* 10000 maxRatePerPartition)。
  • 现在,第二批完成所需的时间约为22秒。
  • 因为我们的批处理间隔设置为10秒,在10秒之后,流作业已经安排了第三个微批,再次1600000。其原因是PID控制器只能利用已完成的微批次的性能信息.
  • 只有在第六批或第七批中,PID控制器才能找到每分区每秒大约5000条消息的最佳处理速率。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69162574

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档