首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka和pyspark程序:无法确定dataframe为空的原因

Kafka和pyspark程序:无法确定dataframe为空的原因
EN

Stack Overflow用户
提问于 2021-09-12 17:35:36
回答 1查看 37关注 0票数 0

下面是我使用kafka和pyspark编写的第一个程序。代码似乎运行无异常,但我的查询的输出是空的。

我正在发起火花和卡夫卡。后来,在Kafka initiation中,我订阅了topic = "quickstart-events“,并从终端生成了该主题的消息。但是当我运行这段代码时,它给了我空白的数据帧。

我该如何解析?

代码:

代码语言:javascript
复制
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession, DataFrame
from pyspark.sql.types import StructType, ArrayType, StructField, IntegerType, StringType, DoubleType

spark = SparkSession.builder \
.appName("Spark-Kafka-Integration") \
.master("local[2]") \
.getOrCreate()

dsraw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "quickstart-events") \
.load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

rawQuery = dsraw \
        .writeStream \
        .queryName("query1")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query1")
raw.show() # empty output

rawQuery = ds \
        .writeStream \
        .queryName("query2")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query2")
raw.show()  # empty output
print("complete")

输出:

代码语言:javascript
复制
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

+---+-----+
|key|value|
+---+-----+
+---+-----+
EN

回答 1

Stack Overflow用户

发布于 2021-09-13 14:38:10

如果你正在学习和尝试kafka spark流媒体,那么它是很好的。

只需使用:

代码语言:javascript
复制
    while (True):
    time.sleep(5)
    print("queryresult")
    raw.show()  # it will start printing the result

而不是

代码语言:javascript
复制
            raw.show() # it will run only once that's why not printig the result.

不要将用于生产代码。

最好这样写:

代码语言:javascript
复制
spark = SparkSession.builder \
    .appName("Spark-Kafka-Integration") \
    .master("local[2]") \
    .getOrCreate()


dsraw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "quickstart-events") \
    .load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

rawQuery = \
    ds \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

rawQuery.awaitTermination()

它会在控制台上自动打印结果。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69153830

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档