首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >TypeError:'JavaPackage‘对象不可调用&在类路径中找不到Spark Streaming的Kafka库

TypeError:'JavaPackage‘对象不可调用&在类路径中找不到Spark Streaming的Kafka库
EN

Stack Overflow用户
提问于 2020-01-05 16:23:12
回答 1查看 513关注 0票数 0

我使用pyspark流来读取kafka数据,但它出错了:

代码语言:javascript
复制
import os
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8:2.0.2 pyspark-shell'
sc = SparkContext(appName="test")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "test-id", {'test': 2})
kafkaStream.map(lambda x: x.split(" ")).pprint()

ssc.start()
ssc.awaitTermination()

________________________________________________________________________________________________

Spark Streaming's Kafka libraries not found in class path. Try one of the following.

1. Include the Kafka library and its dependencies with in the
 spark-submit command as

 $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.3 ...

2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
 Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.3.
 Then, include the jar in the spark-submit command as

 $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

________________________________________________________________________________________________


Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 29, in <module>
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "test-id", {'test': 2})
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 78, in createStream
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 217, in _get_helper
TypeError: 'JavaPackage' object is not callable

我的spark版本: 2.4.3,kafka版本: 2.1.0,我用os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8:2.4.3 pyspark-shell'替换了os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8:2.0.2 pyspark-shell',它也不能工作。我该怎么做呢?

EN

回答 1

Stack Overflow用户

发布于 2020-01-05 17:25:12

我认为您应该在导入和初始化Spark变量之前移动您的导入,以便使用变量加载环境

当然,您也需要使用与Spark版本相同的包版本

代码语言:javascript
复制
import os
sparkVersion = '2.4.3'  # update this accordingly 
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8:{} pyspark-shell'.format(sparkVersion) 

# import Spark core 
from pyspark.sql import SparkSession 
from pyspark.streaming import StreamingContext
# import extra packages 
from pyspark.streaming.kafka import KafkaUtils


# begin application 
spark = SparkSession.builder.appName("test").getOrCreate() 
sc = spark.sparkContext

注意:从Spark 2.3.0开始,不推荐使用Kafka 0.8支持

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59598135

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档