我正在尝试创建一个数据流(批处理),它每小时从读取一个文件,解析它并在BigQuery表中写入一个条目。文件是一个.json,在女巫中,每一行都有一个复杂的json。
我创建了一个简单的管道:
(p
| "Read file" >> beam.io.ReadFromText(cusom_options.file_name)
| "Parse line json" >> beam.Map(parse)
| "Write in BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=cusom_options.table))解析函数如下所示:
def parse(input_elem):
log = json.loads(input_elem)
result = {}
... # Get some fields from input and add to "result"
return result管道工作良好,文件大小为100 Mb和70K线(每个作业5分钟)。但是,当文件增加时,数据流会花费更多的时间(15分钟,200到300 Mb)或者没有完成并以失败结束(超过1.5GB和350 K行)。
我做了一些测试,当我在函数解析中创建一个json示例时,但没有使用input_elem,数据流工作得很好,并在7-8分钟内为每个条目创建一行。
我不知道输油管道的问题在哪里,有人有类似的问题吗?
更多信息
发布于 2020-12-04 11:40:45
我们终于解决了这个问题。与数据流并行,在应用程序中创建了一些VPC网络,防火墙规则没有正确配置。
这种情况类似于文档(用于您工作的VPC网络可能会丢失。)中描述的情况。这些规则是存在的,但没有正确配置。
谢谢!
https://stackoverflow.com/questions/63522936
复制相似问题