我是bonobo-etl的新手,我正在尝试编写一次加载多个文件的作业,但我无法让CsvReader与@use_context_processor注释一起工作。下面是我的代码片段:
def input_file(self, context):
yield 'test1.csv'
yield 'test2.csv'
yield 'test3.csv'
@use_context_processor(input_file)
def extract(f):
return bonobo.CsvReader(path=f,delimiter='|')
def load(*args):
print(*args)
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(extract,load)
return graph当我运行作业时,我得到类似于<bonobo.nodes.io.csv.CsvReader object at 0x7f849678dc88>的东西,而不是CSV的代码行。
如果我像graph.add_chain(bonobo.CsvReader(path='test1.csv',delimiter='|'),load)一样对阅读器进行硬编码,它就能正常工作。
任何帮助都将不胜感激。
谢谢。
发布于 2018-08-07 14:59:55
由于bonobo.CsvReader还不支持从输入流中读取文件名,因此需要使用自定义阅读器。
这是一个适用于我的一组csvs的解决方案:
import bonobo
import bonobo.config
import bonobo.util
import glob
import csv
@bonobo.config.use_context
def read_multi_csv(context, name):
with open(name) as f:
reader = csv.reader(f, delimiter=';')
headers = next(reader)
if not context.output_type:
context.set_output_fields(headers)
for row in reader:
yield tuple(row)
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
glob.glob('prenoms_*.csv'),
read_multi_csv,
bonobo.PrettyPrinter(),
)
return graph
if __name__ == '__main__':
with bonobo.parse_args() as options:
bonobo.run(get_graph(**options))对这段代码的一些评论,按阅读顺序排列:
use_context装饰器将节点执行上下文注入到转换调用中,允许使用第一个csv头来使用.set_output_fields(...)。glob.glob在bonobo.Graph实例中生成文件名(在我的示例中,流将包含: prenoms_2004.csv prenoms_2005.csv ...prenoms_2011.csv prenoms_2012.csv),并将其传递给我们的自定义阅读器,该阅读器将为每个文件调用一次,打开它,并生成其行。希望这能有所帮助!
https://stackoverflow.com/questions/51560777
复制相似问题