在Spark-2.0中,创建火花会话的最佳方法是什么。因为在Spark-2.0和Cassandra中,API已经被重新加工,本质上是反对SqlContext (也包括CassandraSqlContext)。因此,对于执行SQL- -要么我创建一个Cassandra会话(com.datastax.driver.core.Session) and use execute( " ")。或者我必须创建一个SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)方法。
我也不知道SQL的局限性--有人能解释一下吗?
另外,如果我必须创建SparkSession --我怎么做--找不到任何合适的例子。随着API的重新工作,旧的示例无法工作。我正在研究这个代码示例-- DataFrames- -不清楚这里使用的是什么sql (这是正确的方法吗?)(出于某些原因,不推荐的API甚至没有编译-需要检查我的eclipse设置)
谢谢
发布于 2016-12-08 05:02:11
您需要Cassandra会话来从Cassandra创建/删除键空间和表。在Spark应用程序中,为了创建Cassandra会话,需要将SparkConf传递给CassandraConnector。在Spark2.0中,您可以像下面这样做。
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");如果您有现有的Dataframe,那么您也可以使用DataFrameFunctions.createCassandraTable(Df)在Cassandra中创建表。请参阅api详细信息这里。
您可以使用火花-卡桑德拉连接器提供的api从Cassandra读取数据,如下所示。
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
dataset.show(); 您可以使用SparkSession.sql()方法对火花卡桑德拉连接器返回的Dataframe上创建的临时表运行查询,如下所示。
dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();https://stackoverflow.com/questions/41023957
复制相似问题