我是新来的。Im编写一个python程序来读取kafka主题的数据,并将数据打印到stdout。我跟踪了链接Flink Python Datastream API Kafka Producer Sink Serializaion。但由于版本错配,我一直看到NoSuchMethodError。我添加了在https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.0/flink-sql-connector-kafka_2.11-1.13.0.jar上可用的flink-sql连接器。有人能帮我举个合适的例子来做这个吗?以下是我的代码
import json
import os
from pyflink.common import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types
def my_map(obj):
json_obj = json.loads(json.loads(obj))
return json.dumps(json_obj["name"])
def kafkaread():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///automation/flink/flink-sql-connector-kafka_2.11-1.10.1.jar")
deserialization_schema = SimpleStringSchema()
kafkaSource = FlinkKafkaConsumer(
topics='test',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': '10.234.175.22:9092', 'group.id': 'test'}
)
ds = env.add_source(kafkaSource).print()
env.execute('kafkaread')
if __name__ == '__main__':
kafkaread()但是python不识别jar文件,并抛出以下错误。
Traceback (most recent call last):
File "flinkKafka.py", line 31, in <module>
kafkaread()
File "flinkKafka.py", line 20, in kafkaread
kafkaSource = FlinkKafkaConsumer(
File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 186, in __init__
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 336, in _get_kafka_consumer
j_flink_kafka_consumer = j_consumer_clz(topics,
File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 185, in wrapped_call
raise TypeError(
TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'添加jar文件的正确位置是什么?
发布于 2022-02-15 03:12:58
我看到您下载了flink-sql-连接器-kafka_2.11-1.13.0.jar,但是代码loades sql-连接器-kafka_2.11-1.10.1.jar。
也许你可以拿张支票
发布于 2022-04-18 22:41:21
只需检查flink-sql-连接器jar的路径即可。
发布于 2022-07-05 10:55:26
您应该添加flink-sql连接器-kafka的jar文件,这取决于您的pyflink和scala版本。如果版本为真,请检查add_jars函数中的路径(如果jar包在这里)。
https://stackoverflow.com/questions/70974180
复制相似问题