我试图使用WriteToKafka将数据发送到Python中的一个Kafka主题,使用Dataflow作为跑步者,通过Apache发送数据。
通过运行以下脚本:
with beam.Pipeline(options=beam_options) as p:
(p
| beam.Impulse()
| beam.Map(lambda input: (1, input))
| WriteToKafka(
producer_config={
'bootstrap.servers': 'ip:9092,',
},
topic='testclient',
key_serializer='org.apache.kafka.common.serialization.LongSerializer',
value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
)
)我得到了这个错误:
Traceback (most recent call last):
File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
run_pipeline(beam_options)
File "/home/denes/data-science/try_write_to_kafka.py", line 38, in run_pipeline
(p
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 582, in __exit__
self.result = self.run()
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
return Pipeline.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
transform = ptransform.PTransform.from_runner_api(proto, context)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
return constructor(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
DoFnInfo.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1请注意,我已经通过GCP安装了最新的apache版本(由pip install 'apache-beam[gcp]'支持)。
如果我没有错,问题在于序列化方法。我已经尝试过在这页面上找到的各种组合。
我错过了什么,我应该做什么不同的?
发布于 2021-05-11 06:06:39
解决方案是对键和值使用显式类型转换。
| 'Convert dict to byte string' >> beam.Map(lambda x: (b'', json.dumps(x).encode('utf-8')))
.with_output_types(typing.Tuple[bytes, bytes])
| "Write to Kafka topic" >> WriteToKafka(
producer_config={'bootstrap.servers': consumer_servers},
topic='testclient')
)https://stackoverflow.com/questions/66237116
复制相似问题