首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ReadFromKafka卡在数据流束流加工中的应用

ReadFromKafka卡在数据流束流加工中的应用
EN

Stack Overflow用户
提问于 2021-11-30 08:00:15
回答 1查看 326关注 0票数 4

我正在尝试使用Apache和Dataflow从kafka主题中读取数据,将数据打印到控制台,最后将它们写到公共子主题中。但是它似乎被困在了ReadFromKafka函数中。卡夫卡主题中有很多数据,但是当它运行时,在这个过程中什么也不会发生。

代码语言:javascript
复制
import apache_beam as beam
import argparse

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None, save_main_session=True):

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project=sample-project',
    '--region=xxx',
    '--staging_location=gs://xxx',
    '--temp_location=gs://xxx',
    '--job_name=beam-streaming',
    '--worker_machine_type=n1-standard-16',
    '--num_workers=1',
    '--streaming'
])      

class PrintValue(beam.DoFn):
        def process(self, element):
            print(element)
            return [element]

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka( 
            consumer_config={'bootstrap.servers': 'ip:port' },
            topics=['local-events'])
        | 'print' >> beam.ParDo(PrintValue())
        | 'write to pubsub' >> beam.io.WriteToPubSub('projects/sample/topics/test')
        )



if __name__ == '__main__':
 run()            

我知道https://issues.apache.org/jira/browse/BEAM-11998存在一个问题,但据我所知,这个问题只属于便携式运行程序。有人知道ReadFromKafka是否在处理数据流中的无界数据吗?

  • Python 3.8.10
  • apache-beam=2.29.0
  • kafka-python=2.0.2
EN

回答 1

Stack Overflow用户

发布于 2022-11-03 18:47:59

我遇到了类似的问题,转而使用beam.Map转换(确保在run函数中定义了printValue函数,或者您有一个适当的依赖管理方法):

代码语言:javascript
复制
| Map(lambda value: printValue(value))

请注意,您从ReadFromKafka获得的元素类型是一个名为BeamSchema_xxxxxxxxx的特殊类,具有以下属性(假设您配置了读取器with_metadata=True):“主题”、“值”、“计数”、“头”、“索引”、“键”、“偏移”、“分区”、“时间戳”、“时间戳类型”、“时间戳”。它一点也不print好。因此,您需要首先解码您的值,例如:

代码语言:javascript
复制
def decode_kafka_message(record) -> str:
"""
Record attributes passed from ReadFromKafka transform:  'topic', 'value'
    'count', 'headers', 'index', 'key', 'offset', 'partition',
    'timestamp', 'timestampTypeId', 'timestampTypeName'.


:return: Message value as string
"""
if hasattr(record, 'value'):
    value = record.value
elif isinstance(record, tuple):
    value = record[1]
else:
    raise RuntimeError('unknown record type: %s' % type(record))

return value.decode("UTF-8") if isinstance(value, bytes) else value

该连接器可能需要一些工作和更好的文档。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70166028

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档