我正在尝试使用以下conf参数从Spark向Cassandra摄取数据(一个分区= 1MB BLOB):
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows 1
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes 100
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec 1
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS 90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count 10
spark.sql.catalog.cassandra com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions我从总共16个核心的Spark Job开始,然后下降到只有1个核心Spark Job。
无论如何,每次,经过一段时间后,响应如下,驱动程序进入failed状态:
21/09/19 19:03:50 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatementWrapper@532adef2
com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException: Cassandra timeout during SIMPLE write query at consistency LOCAL_QUORUM (2 replica were required but only 0 acknowledged the write)这可能与某些节点过载有关。但是如何调试呢?要调整什么配置?
谢谢
发布于 2021-09-19 20:28:24
问题解决了!
问题出在我的数据上,而不是卡桑德拉。
实际上,一些分区(2000个,共60,000,000个)的大小约为50MB,而不是我预期的1MB。
我只是在Spark中进行过滤,排除了较大的分区:
import org.apache.spark.sql.functions.{col, expr, length}
...
spark.read.parquet("...")
// EXCLUDE LARGE PARTITIONS
.withColumn("bytes_count",length(col("blob")))
.filter("bytes_count< " + argSkipPartitionLargerThan)
// PROJECT
.select("data_key","blob")
// COMMIT
.writeTo(DS + "." + argTargetKS + "."+argTargetTable).append()Spark现在可以在10分钟内摄取(500 GB数据)
https://stackoverflow.com/questions/69245630
复制相似问题