下面是我使用kafka和pyspark编写的第一个程序。代码似乎运行无异常,但我的查询的输出是空的。
我正在发起火花和卡夫卡。后来,在Kafka initiation中,我订阅了topic = "quickstart-events“,并从终端生成了该主题的消息。但是当我运行这段代码时,它给了我空白的数据帧。
我该如何解析?
代码:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession, DataFrame
from pyspark.sql.types import StructType, ArrayType, StructField, IntegerType, StringType, DoubleType
spark = SparkSession.builder \
.appName("Spark-Kafka-Integration") \
.master("local[2]") \
.getOrCreate()
dsraw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "quickstart-events") \
.load()
ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))
rawQuery = dsraw \
.writeStream \
.queryName("query1")\
.format("memory")\
.start()
raw = spark.sql("select * from query1")
raw.show() # empty output
rawQuery = ds \
.writeStream \
.queryName("query2")\
.format("memory")\
.start()
raw = spark.sql("select * from query2")
raw.show() # empty output
print("complete")输出:
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+
+---+-----+
|key|value|
+---+-----+
+---+-----+发布于 2021-09-13 14:38:10
如果你正在学习和尝试kafka spark流媒体,那么它是很好的。
只需使用:
while (True):
time.sleep(5)
print("queryresult")
raw.show() # it will start printing the result而不是
raw.show() # it will run only once that's why not printig the result.不要将用于生产代码。
最好这样写:
spark = SparkSession.builder \
.appName("Spark-Kafka-Integration") \
.master("local[2]") \
.getOrCreate()
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "quickstart-events") \
.load()
ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
rawQuery = \
ds \
.writeStream \
.format("console") \
.outputMode("append") \
.start()
rawQuery.awaitTermination()它会在控制台上自动打印结果。
https://stackoverflow.com/questions/69153830
复制相似问题