首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何正确地向Google提交kafka流吡源作业

如何正确地向Google提交kafka流吡源作业
EN

Stack Overflow用户
提问于 2018-03-11 19:03:30
回答 1查看 1.1K关注 0票数 0

我正试图通过Dataproc提交一个pyspark作业,并不断得到一个错误,看起来它没有加载kafka流包。

下面是UI在我的工作中提供的REST命令:POST /v1/projects/projectname/regions/global/jobs:submit/ { "projectId": "projectname", "job": { "placement": { "clusterName": "cluster-main" }, "reference": { "jobId": "job-33ab811a" }, "pysparkJob": { "mainPythonFileUri": "gs://projectname/streaming.py", "args": [ "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0" ], "jarFileUris": [ "gs://projectname/spark-streaming-kafka-0-10_2.11-2.2.0.jar" ] } } }

我试着把kafka包作为args和jar文件来传递。

下面是我的代码(streaming.py):

代码语言:javascript
复制
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json


sc = SparkContext()

spark = SparkSession.builder.master("local").appName("Spark-Kafka-Integration").getOrCreate()

# < ip > is masked
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<ip>:9092") \
    .option("subscribe", "rsvps") \
    .option("startingOffsets", "earliest") \
    .load()
df.printSchema()

错误::java.lang.ClassNotFoundException:未能找到数据源: kafka。请在http://spark.apache.org/third-party-projects.html找到包裹

全跟踪:https://pastebin.com/Uz3iGy2N

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-03-11 20:09:09

您可能会遇到这样的问题:"--packages“是spark-submit中的语法糖,当高级工具(Dataproc)以编程方式调用Spark时,这种语法交互很糟糕,我在这里的响应中描述了另一种语法:use an external library in pyspark job in a Spark cluster from google-dataproc

长话短说,您可以使用properties在Dataproc请求中指定等效的spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,而不是在作业args中传递--properties

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49223886

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档