首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法使用运动流在spark-streaming中创建流

无法使用运动流在spark-streaming中创建流
EN

Stack Overflow用户
提问于 2019-07-20 02:53:43
回答 2查看 596关注 0票数 2

我是kinesis的新手,我正在尝试使用spark-streaming (Pyspark)处理kinesis流数据,并面临以下错误

以下是我的代码:我正在将twitter数据推送到我的kinesis流中,并尝试使用Spark-streaming进行处理。我尝试在所有依赖项中都包含--jars,但仍然使用相同的issue.Spark版本-2.4.3和2.3.3,并使用适当的spark-streaming kinesis-asl-Assembly.jar

代码语言:javascript
复制
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark import StorageLevel
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,
InitialPositionInStream


spark_session = SparkSession.builder.getOrCreate()
ssc = StreamingContext(spark_session.sparkContext, 10)
sc = spark_session.sparkContext
Kinesis_app_name = "test"
Kinesis_stream_name = "python-stream"
endpoint_url = "https://kinesis.us-east-1.amazonaws.com"
region_name = "us-east-1"

data = KinesisUtils.createStream(
    ssc, Kinesis_app_name, Kinesis_stream_name, endpoint_url,
    region_name, InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_AND_DISK_2)


data.pprint()


ssc.start()  # Start the computation
ssc.awaitTermination()

我想使用spark-streaming处理流,但得到以下错误:

代码语言:javascript
复制
File "C:\spark-2.3.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\kinesis.py", line 92, in createStream
          File "C:\spark-2.3.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
          File "C:\spark-2.3.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
        py4j.protocol.Py4JJavaError: An error occurred while calling o27.createStream.
        : java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/model/Record
                at java.lang.Class.getDeclaredMethods0(Native Method)
                at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
                at java.lang.Class.getDeclaredMethods(Class.java:1975)
                at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:232)
                at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
                at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
                at org.apache.spark.streaming.kinesis.KinesisUtils$.createStream(KinesisUtils.scala:127)
                at org.apache.spark.streaming.kinesis.KinesisUtils$.createStream(KinesisUtils.scala:554)
                at org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper.createStream(KinesisUtils.scala:616)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                at py4j.Gateway.invoke(Gateway.java:282)
                at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                at py4j.commands.CallCommand.execute(CallCommand.java:79)
                at py4j.GatewayConnection.run(GatewayConnection.java:238)
                at java.lang.Thread.run(Thread.java:748)
        Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.model.Record
                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                ... 20 more
EN

回答 2

Stack Overflow用户

发布于 2021-09-30 17:10:24

我遇到了同样的问题。最终,我只包含了spark-streaming-kinesis-asl jar。据我所知,这个jar不包含kinesis sdk。我删除了这个单独的jar,然后使用包管理器和spark-submit参数--packages org.apache.spark:spark-streaming-kinesis-asl_2.12:2.4.4来解决这个问题。如果您使用包管理器,但不删除有问题的jar,程序将无法工作。我希望这能帮助所有在未来遇到这个错误的人。

票数 2
EN

Stack Overflow用户

发布于 2020-02-12 22:54:08

请参考下面的解决方案:

代码语言:javascript
复制
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, StorageLevel

if __name__ == "__main__":

    kinesisConf = {...} # I put all my credentials in here

    batchInterval = 2000
    kinesisCheckpointInterval = batchInterval

    sc = SparkContext(appName="kinesis-stream")
    ssc = StreamingContext(sc, batchInterval)

    data = KinesisUtils.createStream(
        ssc=ssc,
        kinesisAppName=kinesisConf['appName'],
        streamName=kinesisConf['streamName'],
        endpointUrl=kinesisConf['endpointUrl'],
        regionName=kinesisConf['regionName'],
        initialPositionInStream=InitialPositionInStream.LATEST,
        checkpointInterval=kinesisCheckpointInterval,
        storageLevel=StorageLevel.MEMORY_AND_DISK_2,
        awsAccessKeyId=kinesisConf['awsAccessKeyId'],
        awsSecretKey=kinesisConf['awsSecretKey']
    )

    data.pprint()

    ssc.start()
    ssc.awaitTermination()

运行它时,请按如下方式操作:

代码语言:javascript
复制
spark-submit --master local[8] --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.0.0-preview ./streaming.py

2.12 ->指的是scala版本3.0.0 ->指的是spark版本

转到here并确保为该软件包选择了正确的参数

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57118214

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档