我有一个Spark流作业,它如下所示读取Cosmos Changefeed数据,运行在一个DBR8.2的Databricks集群中。
cosmos_config = {
"spark.cosmos.accountEndpoint": cosmos_endpoint,
"spark.cosmos.accountKey": cosmos_key,
"spark.cosmos.database": cosmos_database,
"spark.cosmos.container": collection,
"spark.cosmos.read.partitioning.strategy": "Default",
"spark.cosmos.read.inferSchema.enabled" : "false",
"spark.cosmos.changeFeed.startFrom" : "Now",
"spark.cosmos.changeFeed.mode" : "Incremental"
}df_ read = (spark.readStream
.format("cosmos.oltp.changeFeed")
.options(**cosmos_config)
.schema(cosmos_schema)
.load())
df_write = (df_ read.withColumn("partition_date",current_date())
.writeStream
.partitionBy("partition_date")
.format('delta')
.option("path", master_path)
.option("checkpointLocation", f"{master_path}_checkpointLocation")
.queryName("cosmosStream")
.trigger(processingTime='10 seconds')
.start()
) 虽然工作通常运行良好,但有时流会突然停止,下面的内容出现在log4j输出中的一个循环中。重新启动作业将处理“待办事项”中的所有数据。以前有人经历过这样的事情吗?我不确定是什么导致了这一切。有什么想法吗?
22/02/27 00:57:58 INFO HiveMetaStore: 1: get_database: default
22/02/27 00:57:58 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: default
22/02/27 00:57:58 INFO DriverCorral: Metastore health check ok
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Starting...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Start completed.
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
22/02/27 00:58:07 INFO HikariDataSource: metastore-monitor - Shutdown completed.
22/02/27 00:58:07 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 88 milliseconds)
22/02/27 00:58:50 INFO RxDocumentClientImpl: Getting database account endpoint from https://<cosmosdb_endpoint>.documents.azure.com:443发布于 2022-02-28 15:59:15
您使用的宇宙火花连接器的哪个版本?在4.3.0到4.6.2之间,大量摄入代码路径中有几个bug修复程序。
有关更多详细信息,请参阅https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md。
https://stackoverflow.com/questions/71283945
复制相似问题