首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink Python Datastream API Kafka使用者

Flink Python Datastream API Kafka使用者
EN

Stack Overflow用户
提问于 2022-02-03 15:42:19
回答 3查看 1.2K关注 0票数 0

我是新来的。Im编写一个python程序来读取kafka主题的数据,并将数据打印到stdout。我跟踪了链接Flink Python Datastream API Kafka Producer Sink Serializaion。但由于版本错配,我一直看到NoSuchMethodError。我添加了在https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.0/flink-sql-connector-kafka_2.11-1.13.0.jar上可用的flink-sql连接器。有人能帮我举个合适的例子来做这个吗?以下是我的代码

代码语言:javascript
复制
import json
import os

from pyflink.common import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types


def my_map(obj):
    json_obj = json.loads(json.loads(obj))
    return json.dumps(json_obj["name"])


def kafkaread():
    env = StreamExecutionEnvironment.get_execution_environment()

    env.add_jars("file:///automation/flink/flink-sql-connector-kafka_2.11-1.10.1.jar")

    deserialization_schema = SimpleStringSchema()

    kafkaSource = FlinkKafkaConsumer(
        topics='test',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': '10.234.175.22:9092', 'group.id': 'test'}
    )

    ds = env.add_source(kafkaSource).print()
    env.execute('kafkaread')


if __name__ == '__main__':
    kafkaread()

但是python不识别jar文件,并抛出以下错误。

代码语言:javascript
复制
Traceback (most recent call last):
  File "flinkKafka.py", line 31, in <module>
    kafkaread()
  File "flinkKafka.py", line 20, in kafkaread
    kafkaSource = FlinkKafkaConsumer(
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 186, in __init__
    j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 336, in _get_kafka_consumer
    j_flink_kafka_consumer = j_consumer_clz(topics,
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 185, in wrapped_call
    raise TypeError(
TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'

添加jar文件的正确位置是什么?

EN

回答 3

Stack Overflow用户

发布于 2022-02-15 03:12:58

我看到您下载了flink-sql-连接器-kafka_2.11-1.13.0.jar,但是代码loades sql-连接器-kafka_2.11-1.10.1.jar。

也许你可以拿张支票

票数 1
EN

Stack Overflow用户

发布于 2022-04-18 22:41:21

只需检查flink-sql-连接器jar的路径即可。

票数 0
EN

Stack Overflow用户

发布于 2022-07-05 10:55:26

您应该添加flink-sql连接器-kafka的jar文件,这取决于您的pyflink和scala版本。如果版本为真,请检查add_jars函数中的路径(如果jar包在这里)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70974180

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档