我是spark和kafka的新手。使用从免费kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是一直将状态显示为“正在初始化流”。
代码:
from pyspark.sql.functions import col
kafkaServer="<server>"
editsDF=(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaServer)
.option("sasl.username","<username>")
.option("sasl.password","<password>")
.option("group.id", "%s-consumer" % "<username>")
.option("session.timeout.ms", 6000)
.option("default.topic.config", {"auto.offset.reset": "smallest"})
.option('security.protocol', 'SASL_SSL')
.option('sasl.mechanisms', 'SCRAM-SHA-256')
.option("subscribe","<topic>")
.option("startingOffsets","latest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(col("value").cast("STRING"))
)
query = editsDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()The status in databricks while running the code:
如果我遗漏了什么,请告诉我。提前谢谢。
注意:我已经确保kafka服务器能够生成消息,并且能够在python程序中使用它。但不能在火花源里工作。此外,数据的大小非常小,因此不会因为性能而出现问题。
编辑:这个建议的函数display()仍然不能为这个有问题的Kafka服务器打印任何数据,但是当我尝试完全使用另一个Kafka服务器时,它工作得很好。我认为这是因为这个kafka服务器(有问题)正在使用SASL-SCRAM身份验证,所以可能需要进行一些不同的配置。如果您有从Pyspark连接SASL Kafka的信息,请提供任何详细信息/链接/样本。谢谢!
发布于 2021-05-23 15:33:22
当您使用console接收器时,它会将数据打印到标准输出(请参阅Spark docs),因此您需要检查集群UI中的驱动程序日志以获取生成的数据。
要查看Databricks notebook本身中的数据,您需要使用支持显示来自结构化流的数据的display函数(请参阅Databricks docs)。因此,不是
query = editsDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()你只需要写下:
display(editsDF)您还可以将其他选项传递给此函数,例如,checkpointLocation、trigger等-检查我上面链接的文档。
https://stackoverflow.com/questions/67651610
复制相似问题