我已经开发了一个spark流媒体应用程序,其中我有json字符串的数据流。
sc = SparkContext("local[*]", "appname")
sc.setLogLevel("WARN")
sqlContext = sql.SQLContext(sc)
#batch width in time
stream = StreamingContext(sc, 5)
stream.checkpoint("checkpoint")
# mqtt setup
brokerUrl = "tcp://localhost:1883"
topic = "test"
# mqtt stream
DS = MQTTUtils.createStream(stream, brokerUrl, topic)
# transform DStream to be able to read json as a dict
jsonDS = kvs.map(lambda v: json.loads(v))
#create SQL-like rows from the json
sqlDS = jsonDS.map(lambda x: Row(a=x["a"], b=x["b"], c=x["c"], d=x["d"]))
#in each batch do something
sqlDS.foreachRDD(doSomething)
# run
stream.start()
stream.awaitTermination()
def doSomething(time,rdd):
data = rdd.toDF().toPandas()上面的代码工作正常:我以字符串的方式接收一些dataframe,我可以将每个批处理转换为一个数据帧,也可以将其转换为一个Pandas json。
到目前一切尚好。
如果我想向DataFrame添加一个不同的模式,问题就来了。toDF()方法在以下函数中假定有一个schema=None:sqlContext.createDataFrame(rdd, schema)。
如果我试图从doSomething()内部访问sqlContext,显然它没有定义。如果我试图用一个全局变量使它在那里可用,我得到的典型错误是它不能被序列化。
我还读到了sqlContext只能在Spark驱动程序中使用,而不能在workers中使用。
所以问题是:当toDF()需要sqlContext时,它首先是如何工作的?我怎样才能给它添加一个模式(希望不改变源代码)?
在驱动程序中创建DataFrame似乎不是一个选择,因为我不能将其序列化到工作程序中。
也许我没有正确地理解这一点。
提前谢谢你!
发布于 2018-01-22 00:40:54
回答我自己的问题。
定义以下内容:
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]然后从工人那里调用:
spark = getSparkSessionInstance(rdd.context.getConf())https://stackoverflow.com/questions/48367442
复制相似问题