你好,我正在尝试使用pyspark + kafka,为了做到这一点,我执行这个命令,以便设置kafka-集群。
zookeeper-server-start.sh $KAFKA_HOME/../config/zookeeper.properties
kafka-server-start.sh $KAFKA_HOME/../config/*-0.properties & kafka-server-start.sh $KAFKA_HOME/../config/*-1.propertiesspark-3.2.0-bin-hadoop2-7
python代码是:
spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)
spark = SparkSession \
.builder \
.appName("TP3") \
.getOrCreate()
!spark-submit --class TP3 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 TweetCount.ipynb这将返回以下错误:
错误:加载类TP3失败。
当我执行spark.readStream时
consumer = KafkaConsumer('topic')
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'localhost:9092') \
.option("subscribe", 'topic') \
.load()我得到了一个错误:
未能找到数据来源:卡夫卡。请按照“结构化流+ Kafka集成指南”的部署部分部署应用程序。
我如何才能执行读流,以便从卡夫卡读物与火种?
谢谢
发布于 2022-01-16 00:15:32
最后,我在笔记本的开头用下面的代码解决了问题。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'https://stackoverflow.com/questions/70725346
复制相似问题