首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将directJoin与spark (scala)结合使用?

如何将directJoin与spark (scala)结合使用?
EN

Stack Overflow用户
提问于 2022-03-31 13:34:39
回答 1查看 111关注 0票数 1

我试图在分区键中使用directJoin。但是当我运行引擎时,它不使用directJoin。如果我做错了什么,我想了解。下面是我使用的代码:

配置设置:

代码语言:javascript
复制
val sparkConf: SparkConf = new SparkConf()
    .set(
      s"spark.sql.extensions",
      "com.datastax.spark.connector.CassandraSparkExtensions"
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial",
      "com.datastax.spark.connector.datasource.CassandraCatalog"
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.host",
      Settings.cassandraServerAddress
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.username",
      Settings.cassandraUser
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.auth.password",
      Settings.cassandraPass
    )
    .set(
      s"spark.sql.catalog.CassandraCommercial.spark.cassandra.connection.port",
      Settings.cassandraPort
    )

我使用目录是因为我打算在不同的集群上使用数据库。

SparkSession:

代码语言:javascript
复制
  val sparkSession: SparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .appName(Settings.appName)
    .getOrCreate()

我尝试了以下两种方法:

这是:

代码语言:javascript
复制
val parameterVOne= spark.read
    .table("CassandraCommercial.ky.parameters")
    .select(
      "id",
      "year",
      "code"
    )

这是:

代码语言:javascript
复制
val parameterVTwo= spark.read
    .cassandraFormat("parameters", "CassandraCommercial.ky")
    .load
    .select(
      "id",
      "year",
      "code"
    )

第一个,虽然spark没有使用directjoin,但如果我使用show(),它通常会显示数据:

代码语言:javascript
复制
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#19, year#22, code#0]
   +- SortMergeJoin [id#19, year#22, code#0], [id#0, year#3, code#2, value#6], Inner, ((id#19 = id#0) AND (year#22 = year#3) AND (code#0 = code#2))

第二次返回如下:

代码语言:javascript
复制
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=307be82d): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/0:0:0:0:0:0:0:1:9042, hostId=null, hashCode=3ebc1052): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]

显然,第二种方式没有采用目录中定义的设置,并且与第一种方式不同,它直接访问本地主机。

拥有密钥的dataframe只有7行,而cassandra dataframe大约有200万行。

这是我的bild.sbt:

代码语言:javascript
复制
ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.15"

lazy val root = (project in file("."))
  .settings(
    name                                        := "test-job",
    idePackagePrefix                            := Some("com.teste"),
    libraryDependencies += "org.apache.spark"   %% "spark-sql"                               % "3.2.1",
    libraryDependencies += "org.apache.spark"   %% "spark-core"                              % "3.2.1",
    libraryDependencies += "org.postgresql"      % "postgresql"                              % "42.3.3",
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector"               % "3.1.0",
    libraryDependencies += "joda-time"           % "joda-time"                               % "2.10.14",
    libraryDependencies += "com.crealytics"     %% "spark-excel"                             % "3.2.1_0.16.5-pre2",
    libraryDependencies += "com.datastax.spark"  % "spark-cassandra-connector-assembly_2.12" % "3.1.0"
  )
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-03-31 14:35:49

我在Spark的某些版本中看到了这种行为--不幸的是,Spark内部的更改经常破坏这个功能,因为它依赖于内部细节。因此,请提供更多的信息,什么版本的火花和火花连接器是使用。

关于第二个错误,我怀疑直接连接可能不使用Spark属性,您可以尝试使用spark.cassandra.connection.hostspark.cassandra.auth.password和其他配置参数吗?

我有一个关于使用DirectJoin的长博客文章,但是它是在Spark2.4.x上测试的(也许在3.0上,不记得了

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71693441

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档