首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Apache Beam中读取CSV文件时跳过头

在Apache Beam中读取CSV文件时跳过头
EN

Stack Overflow用户
提问于 2018-06-14 10:45:59
回答 3查看 7.6K关注 0票数 4

我想跳过CSV文件中的标题行。到目前为止,在将标题加载到google存储之前,我将手动删除它。

下面是我的代码:

代码语言:javascript
复制
PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));

我已经检查了堆栈溢出中存在的问题,但我发现它没有希望:跳过头行-云DataFlow可以跳过头行吗?

有什么帮助吗?

编辑:,我已经尝试了如下所示的东西,它起了作用:

代码语言:javascript
复制
PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {  
            String[] strArr2 = c.element().split(",");
            String header = Arrays.toString(strArr2);
            ClassFinance fin = new ClassFinance();

                if(header.contains("Beneficiary"))
                System.out.println("Header");
                else {
            fin.setBeneficiaryFinance(strArr2[0].trim());
            fin.setCatlibCode(strArr2[1].trim());
            fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
            fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
            fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
            fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
            c.output(fin);
            }
        }
    }));
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-06-14 11:13:26

您共享的旧堆栈溢出帖子(https://stackoverflow.com/questions/28450554/skipping-header-rows-is-it-possible-with-cloud-dataflow)确实包含了问题的答案。

这个选项是当前在Apache中没有可用的,尽管Apache问题跟踪器( 梁-123 )中有一个开放的特性请求。请注意,在编写时,这个特性请求仍然是开放的和未解决的,并且它已经像这样已经两年了。然而,在这个意义上,似乎正在做一些努力,而且这个问题的最新更新是从2018年2月开始的,所以我建议您保持关于JIRA问题的最新信息,因为它是最后一次转移到sdk-java-core组件上,并且它可能会得到更多的关注。

考虑到这些信息,我想说的是,您所使用的方法(在将文件上传到GCS之前删除头)是您的最佳选择。我将避免手工操作,因为您可以轻松地编写脚本并自动删除标题⟶上传文件过程。

编辑:

我已经能够提出一个简单的过滤器使用DoFn。它可能不是最优雅的解决方案(我自己也不是Apache专家),但它确实有效,而且您可能能够使它适应您的需求。它要求您事先知道正在上载的CSV文件的头(因为它将根据元素内容进行过滤),但是,还是要把它当作一个模板,您可以根据需要修改这个模板:

代码语言:javascript
复制
public class RemoveCSVHeader {
  // The Filter class
  static class FilterCSVHeaderFn extends DoFn<String, String> {
    String headerFilter;

    public FilterCSVHeaderFn(String headerFilter) {
      this.headerFilter = headerFilter;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String row = c.element();
      // Filter out elements that match the header
      if (!row.equals(this.headerFilter)) {
        c.output(row);
      }
    }
  }

  // The main class
  public static void main(String[] args) throws IOException {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));

    String header = "col1,col2,col3,col4";

    vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
        .apply(TextIO.write().to("out"));

    p.run().waitUntilFinish();
  }
}
票数 5
EN

Stack Overflow用户

发布于 2019-02-19 18:05:08

这个密码适用于我。我使用Filter.by()从csv文件中筛选出标题行。

代码语言:javascript
复制
static void run(GcsToDbOptions options) {

Pipeline p = Pipeline.create(options);
// Read the CSV file from GCS input file path
p.apply("Read Rows from " + options.getInputFile(), TextIO.read()
    .from(options.getInputFile()))
    // filter the header row
    .apply("Remove header row",
        Filter.by((String row) -> !((row.startsWith("dwid") || row.startsWith("\"dwid\"")
            || row.startsWith("'dwid'")))))
    // write the rows to database using prepared statement
    .apply("Write to Auths Table in Postgres", JdbcIO.<String>write()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource(options)))
        .withStatement(INSERT_INTO_MYTABLE)
        .withPreparedStatementSetter(new StatementSetter()));
PipelineResult result = p.run();
try {
  result.getState();
  result.waitUntilFinish();
} catch (UnsupportedOperationException e) {
  // do nothing
} catch (Exception e) {
  e.printStackTrace();
}}
票数 2
EN

Stack Overflow用户

发布于 2018-10-26 20:31:53

https://medium.com/@baranitharan/the-textio-write-1be1c07fbef0数据流中的TextIO.Write现在具有向数据中添加标题行的withHeader函数。此函数是在verison 1.7.0中添加的。

因此,您可以向csv添加如下标题:

代码语言:javascript
复制
TextIO.Write.named("WriteToText")
            .to("/path/to/the/file")
            .withHeader("col_name1,col_name2,col_name3,col_name4")
            .withSuffix(".csv"));

withHeader函数在标题行的末尾自动添加换行符。

票数 -3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50855647

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档