显然在Spark streaming中没有对Cassandra接收器的内置支持。我在网上找到了这个例子,它基于ForEachWriter为Spark structured streaming实现了一个自定义的Cassandra接收器:
https://dzone.com/articles/cassandra-sink-for-spark-structured-streaming
我知道我们需要创建一个ForeachWriter实现,它负责打开到接收器(Cassandra)的连接,写入数据并关闭连接。所以CassandraSinkForeach和CassandraDriver类是有意义的。
但是,我不知道是否需要使SparkSessionBuilder可序列化,甚至不需要在CassandraDriver类中初始化SparkSession实例。这样做的唯一原因似乎是从sparkConf初始化CassandraConnector。
根据CassandraConnector文档,可以从传入的CassandraConnectorConfig初始化CassandraConnector对象:http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.4.0/spark-cassandra-connector/#com.datastax.spark.connector.cql.CassandraConnector
有没有人能解释一下是否需要在workers中初始化SparkSession?这是一个通用的模式吗?如果是,为什么要这样做?
发布于 2019-02-25 15:27:27
如果你可以升级到Spark2.4,你就可以使用ForEachBatch,在那里你可以在流数据帧上应用批处理编写器。
https://stackoverflow.com/questions/54843918
复制相似问题