我正在尝试将数据写入一个卡夫卡主题后,阅读了一个蜂巢表如下。
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()在成功写入(记录数为29000)之后,我将从另一个文件(read_kafka_data.py)中读取来自以下主题的数据:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()我是一个初学者卡夫卡,并一直阅读卡夫卡性能优化技术,并遇到这两个。
spark.streaming.backpressure.enabled和spark.streaming.kafka.maxRatePerPartition
要启用第一个参数:
sparkConf.set("spark.streaming.backpressure.enabled",”true”)正式文件对上述参数的解释如下:
启用或禁用火花流的内部背压机制(自1.5起)。这使得火花流能够根据当前批处理调度延迟和处理时间来控制接收速率,以便系统接收到的速度仅限于系统所能处理的速度。在内部,这动态地设置接收器的最大接收速率。此速率是由
spark.streaming.receiver.maxRate和spark.streaming.kafka.maxRatePerPartition值所限定的上限。
既然我是第一次运行应用程序,并且没有以前的微批处理,那么我应该为:spark.streaming.backpressure.initialRate指定一些值吗?
如果是这样,我应该如何确定spark.streaming.backpressure.initialRate的值。文档还指出,如果将spark.streaming.backpressure.enabled设置为true,则动态设置最大接收速率。如果是这样的话,如果spark.streaming.receiver.maxRate和spark.streaming.kafka.maxRatePerPartition被设置为true,我们还需要配置:spark.streaming.backpressure.enabled和true吗?
这个链接说,当施加背压时,使用spark.streaming.backpressure.initialRate没有任何影响。
任何帮助清理混乱将是非常感谢的。
发布于 2021-09-22 20:54:15
您所指的配置spark.streaming.[...]属于直接流(也称为火花流),而不是属于结构化流。
如果您不知道这种差异,我建议您查看一下单独的编程指南:
结构化流不提供背压机制。当您从Kafka消费时,您可以使用(就像您已经在做的那样)选项maxOffsetsPerTrigger来对每个触发器上的读消息设置一个限制。此选项在结构化流与Kafka集成指南中记录为:
“每个触发间隔处理的最大偏移数的利率限制。指定的偏移总数将按比例在不同容量的topicPartitions之间分割。”
如果你还对标题问题感兴趣
在卡夫卡火花缭乱的情况下,
spark.streaming.kafka.maxRatePerPartition和spark.streaming.backpressure.enabled有什么关系?
这种关系在关于火花结构的文档中得到了解释。
“启用或禁用火花流的内部背压机制(自1.5起)。这使火花流能够根据当前批处理调度延迟和处理时间来控制接收速率,以便系统只在系统能够处理的速度范围内接收。在内部,这会动态地设置接收者的最大接收速率。如果设置了
spark.streaming.receiver.maxRate和spark.streaming.kafka.maxRatePerPartition,则该速率的上限是和(见下文)。”
关于火花流(DStream,而不是结构化流)中的背压机制的所有细节都在您已经链接过启用背压,使您的星火流应用程序生产准备就绪的博客中进行了解释。
通常,如果您启用背压,您将设置spark.streaming.kafka.maxRatePerPartition为最优估计速率的150% ~ 200%。
PID控制器的精确计算可以在类PIDRateEstimator中的代码中找到。
火花流背压实例
正如您要求的一个例子,下面是我在我的一个高效应用程序中所做的一个例子:
架设
spark.streaming.backpressure.enabled设置为真spark.streaming.kafka.maxRatePerPartition设置为10000spark.streaming.backpressure.pid.minRate保持默认值为100观察
https://stackoverflow.com/questions/69162574
复制相似问题