首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Apache Beam (GCP数据流)写入Kafka

使用Apache Beam (GCP数据流)写入Kafka
EN

Stack Overflow用户
提问于 2021-02-17 07:01:13
回答 1查看 840关注 0票数 1

我试图使用WriteToKafka将数据发送到Python中的一个Kafka主题,使用Dataflow作为跑步者,通过Apache发送数据。

通过运行以下脚本:

代码语言:javascript
复制
    with beam.Pipeline(options=beam_options) as p:
        (p
        | beam.Impulse()
        | beam.Map(lambda input: (1, input))
        | WriteToKafka(
                    producer_config={
                        'bootstrap.servers': 'ip:9092,',
                    },
                    topic='testclient',
                    key_serializer='org.apache.kafka.common.serialization.LongSerializer',
                    value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
                )
         )

我得到了这个错误:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
    run_pipeline(beam_options)
  File "/home/denes/data-science/try_write_to_kafka.py", line 38, in run_pipeline
    (p
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 582, in __exit__
    self.result = self.run()
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
    return Pipeline.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
    transform = ptransform.PTransform.from_runner_api(proto, context)
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
    return constructor(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
    DoFnInfo.from_runner_api(
  File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
    raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1

请注意,我已经通过GCP安装了最新的apache版本(由pip install 'apache-beam[gcp]'支持)。

  • apache-beam=2.27.0
  • core=1.5.0

如果我没有错,问题在于序列化方法。我已经尝试过在页面上找到的各种组合。

我错过了什么,我应该做什么不同的?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-05-11 06:06:39

解决方案是对键和值使用显式类型转换。

代码语言:javascript
复制
    | 'Convert dict to byte string' >> beam.Map(lambda x: (b'', json.dumps(x).encode('utf-8')))
      .with_output_types(typing.Tuple[bytes, bytes])
    | "Write to Kafka topic" >> WriteToKafka(
        producer_config={'bootstrap.servers': consumer_servers},
        topic='testclient')
      )
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66237116

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档