首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Spark Streaming应用程序中,使用Spark workers端的模式创建Dataframe

在Spark Streaming应用程序中,使用Spark workers端的模式创建Dataframe
EN

Stack Overflow用户
提问于 2018-01-21 21:44:18
回答 1查看 427关注 0票数 0

我已经开发了一个spark流媒体应用程序,其中我有json字符串的数据流。

代码语言:javascript
复制
sc = SparkContext("local[*]", "appname")
sc.setLogLevel("WARN")
sqlContext = sql.SQLContext(sc)

#batch width in time
stream = StreamingContext(sc, 5)
stream.checkpoint("checkpoint")

# mqtt setup
brokerUrl = "tcp://localhost:1883"
topic = "test"

# mqtt stream
DS = MQTTUtils.createStream(stream, brokerUrl, topic)

# transform DStream to be able to read json as a dict
jsonDS = kvs.map(lambda v: json.loads(v))

#create SQL-like rows from the json 
sqlDS = jsonDS.map(lambda x: Row(a=x["a"], b=x["b"], c=x["c"], d=x["d"]))

#in each batch do something
sqlDS.foreachRDD(doSomething)

# run
stream.start()
stream.awaitTermination()

def doSomething(time,rdd):

   data = rdd.toDF().toPandas()

上面的代码工作正常:我以字符串的方式接收一些dataframe,我可以将每个批处理转换为一个数据帧,也可以将其转换为一个Pandas json。

到目前一切尚好。

如果我想向DataFrame添加一个不同的模式,问题就来了。toDF()方法在以下函数中假定有一个schema=NonesqlContext.createDataFrame(rdd, schema)

如果我试图从doSomething()内部访问sqlContext,显然它没有定义。如果我试图用一个全局变量使它在那里可用,我得到的典型错误是它不能被序列化。

我还读到了sqlContext只能在Spark驱动程序中使用,而不能在workers中使用。

所以问题是:当toDF()需要sqlContext时,它首先是如何工作的?我怎样才能给它添加一个模式(希望不改变源代码)?

在驱动程序中创建DataFrame似乎不是一个选择,因为我不能将其序列化到工作程序中。

也许我没有正确地理解这一点。

提前谢谢你!

EN

回答 1

Stack Overflow用户

发布于 2018-01-22 00:40:54

回答我自己的问题。

定义以下内容:

代码语言:javascript
复制
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
    globals()["sparkSessionSingletonInstance"] = SparkSession \
        .builder \
        .config(conf=sparkConf) \
        .getOrCreate()
return globals()["sparkSessionSingletonInstance"]

然后从工人那里调用:

代码语言:javascript
复制
spark = getSparkSessionInstance(rdd.context.getConf())

摘自DataFrame and SQL Operations

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48367442

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档