我正在尝试使用Apache和Dataflow从kafka主题中读取数据,将数据打印到控制台,最后将它们写到公共子主题中。但是它似乎被困在了ReadFromKafka函数中。卡夫卡主题中有很多数据,但是当它运行时,在这个过程中什么也不会发生。
import apache_beam as beam
import argparse
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=sample-project',
'--region=xxx',
'--staging_location=gs://xxx',
'--temp_location=gs://xxx',
'--job_name=beam-streaming',
'--worker_machine_type=n1-standard-16',
'--num_workers=1',
'--streaming'
])
class PrintValue(beam.DoFn):
def process(self, element):
print(element)
return [element]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'ip:port' },
topics=['local-events'])
| 'print' >> beam.ParDo(PrintValue())
| 'write to pubsub' >> beam.io.WriteToPubSub('projects/sample/topics/test')
)
if __name__ == '__main__':
run() 我知道https://issues.apache.org/jira/browse/BEAM-11998存在一个问题,但据我所知,这个问题只属于便携式运行程序。有人知道ReadFromKafka是否在处理数据流中的无界数据吗?
发布于 2022-11-03 18:47:59
我遇到了类似的问题,转而使用beam.Map转换(确保在run函数中定义了printValue函数,或者您有一个适当的依赖管理方法):
| Map(lambda value: printValue(value))请注意,您从ReadFromKafka获得的元素类型是一个名为BeamSchema_xxxxxxxxx的特殊类,具有以下属性(假设您配置了读取器with_metadata=True):“主题”、“值”、“计数”、“头”、“索引”、“键”、“偏移”、“分区”、“时间戳”、“时间戳类型”、“时间戳”。它一点也不print好。因此,您需要首先解码您的值,例如:
def decode_kafka_message(record) -> str:
"""
Record attributes passed from ReadFromKafka transform: 'topic', 'value'
'count', 'headers', 'index', 'key', 'offset', 'partition',
'timestamp', 'timestampTypeId', 'timestampTypeName'.
:return: Message value as string
"""
if hasattr(record, 'value'):
value = record.value
elif isinstance(record, tuple):
value = record[1]
else:
raise RuntimeError('unknown record type: %s' % type(record))
return value.decode("UTF-8") if isinstance(value, bytes) else value该连接器可能需要一些工作和更好的文档。
https://stackoverflow.com/questions/70166028
复制相似问题