首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >向CassandraConnector添加自定义编解码器

向CassandraConnector添加自定义编解码器
EN

Stack Overflow用户
提问于 2016-11-01 15:48:54
回答 1查看 1.4K关注 0票数 3

是否有方法在CassandraConnector实例化时注册自定义编解码器?

我现在每次打电话给cassandraConnector.withSessionDo时都会注册我的编解码器

代码语言:javascript
复制
val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
...
...
.mapPartitions(partition => {
  cassandraConnector.withSessionDo(session => {
    // register custom codecs once for each partition so it isn't loaded as often for each data point
    if (partition.nonEmpty) {
      session.getCluster.getConfiguration.getCodecRegistry
        .register(new TimestampLongCodec)
        .register(new SummaryStatsBlobCodec)
        .register(new JavaHistogramBlobCodec)
    }

这样做有点像反模式。它还会阻塞我们的日志,因为我们有一个火花流服务,每30秒运行一次,它用以下内容填充日志:

代码语言:javascript
复制
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec SummaryStatsBlobCodec [blob <-> SummaryStats] because it collides with previously registered codec SummaryStatsBlobCodec [blob <-> SummaryStats]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec JavaHistogramBlobCodec [blob <-> Histogram] because it collides with previously registered codec JavaHistogramBlobCodec [blob <-> Histogram]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec TimestampLongCodec [timestamp <-> java.lang.Long] because it collides with previously registered codec TimestampLongCodec [timestamp <-> java.lang.Long]

编辑:

我试过像这样立即注册:

代码语言:javascript
复制
val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
cassandraConnector.withClusterDo(cluster => {
  cluster.getConfiguration.getCodecRegistry
    .register(new TimestampLongCodec)
    .register(new SummaryStatsBlobCodec)
    .register(new JavaHistogramBlobCodec)
})

这个^在本地工作,但是当部署到我们的mesos集群时,它找不到编解码器。我假设这是因为它只在驱动程序中本地注册那些,并且从不将它们添加到executors版本中。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-11-16 15:30:37

更好的方法是覆盖cassandra连接工厂,如下所示

代码语言:javascript
复制
import com.datastax.driver.core.Cluster
import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnectorConf, DefaultConnectionFactory}
object MyConnectionFactory extends CassandraConnectionFactory {
  override def createCluster(conf: CassandraConnectorConf): Cluster = {
    val cluster = DefaultConnectionFactory.createCluster(conf)
    cluster.getConfiguration.getCodecRegistry
      .register(new TimestampLongCodec)
      .register(new SummaryStatsBlobCodec)
      .register(new JavaHistogramBlobCodec)
    cluster
  }
}

并将spark.cassandra.connection.factory参数设置为指向类

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40363611

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档