我想跳过CSV文件中的标题行。到目前为止,在将标题加载到google存储之前,我将手动删除它。
下面是我的代码:
PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassFinance fin = new ClassFinance();
fin.setBeneficiaryFinance(strArr[0]);
fin.setCatlibCode(strArr[1]);
fin.set_rNR_(Double.valueOf(strArr[2]));
fin.set_rNCS_(Double.valueOf(strArr[3]));
fin.set_rCtb_(Double.valueOf(strArr[4]));
fin.set_rAC_(Double.valueOf(strArr[5]));
c.output(fin);
}
}));我已经检查了堆栈溢出中存在的问题,但我发现它没有希望:跳过头行-云DataFlow可以跳过头行吗?
有什么帮助吗?
编辑:,我已经尝试了如下所示的东西,它起了作用:
PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr2 = c.element().split(",");
String header = Arrays.toString(strArr2);
ClassFinance fin = new ClassFinance();
if(header.contains("Beneficiary"))
System.out.println("Header");
else {
fin.setBeneficiaryFinance(strArr2[0].trim());
fin.setCatlibCode(strArr2[1].trim());
fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
c.output(fin);
}
}
}));发布于 2018-06-14 11:13:26
您共享的旧堆栈溢出帖子(https://stackoverflow.com/questions/28450554/skipping-header-rows-is-it-possible-with-cloud-dataflow)确实包含了问题的答案。
这个选项是当前在Apache中没有可用的,尽管Apache问题跟踪器( 梁-123 )中有一个开放的特性请求。请注意,在编写时,这个特性请求仍然是开放的和未解决的,并且它已经像这样已经两年了。然而,在这个意义上,似乎正在做一些努力,而且这个问题的最新更新是从2018年2月开始的,所以我建议您保持关于JIRA问题的最新信息,因为它是最后一次转移到sdk-java-core组件上,并且它可能会得到更多的关注。
考虑到这些信息,我想说的是,您所使用的方法(在将文件上传到GCS之前删除头)是您的最佳选择。我将避免手工操作,因为您可以轻松地编写脚本并自动删除标题⟶上传文件过程。
编辑:
我已经能够提出一个简单的过滤器使用DoFn。它可能不是最优雅的解决方案(我自己也不是Apache专家),但它确实有效,而且您可能能够使它适应您的需求。它要求您事先知道正在上载的CSV文件的头(因为它将根据元素内容进行过滤),但是,还是要把它当作一个模板,您可以根据需要修改这个模板:
public class RemoveCSVHeader {
// The Filter class
static class FilterCSVHeaderFn extends DoFn<String, String> {
String headerFilter;
public FilterCSVHeaderFn(String headerFilter) {
this.headerFilter = headerFilter;
}
@ProcessElement
public void processElement(ProcessContext c) {
String row = c.element();
// Filter out elements that match the header
if (!row.equals(this.headerFilter)) {
c.output(row);
}
}
}
// The main class
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));
String header = "col1,col2,col3,col4";
vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
.apply(TextIO.write().to("out"));
p.run().waitUntilFinish();
}
}发布于 2019-02-19 18:05:08
这个密码适用于我。我使用Filter.by()从csv文件中筛选出标题行。
static void run(GcsToDbOptions options) {
Pipeline p = Pipeline.create(options);
// Read the CSV file from GCS input file path
p.apply("Read Rows from " + options.getInputFile(), TextIO.read()
.from(options.getInputFile()))
// filter the header row
.apply("Remove header row",
Filter.by((String row) -> !((row.startsWith("dwid") || row.startsWith("\"dwid\"")
|| row.startsWith("'dwid'")))))
// write the rows to database using prepared statement
.apply("Write to Auths Table in Postgres", JdbcIO.<String>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource(options)))
.withStatement(INSERT_INTO_MYTABLE)
.withPreparedStatementSetter(new StatementSetter()));
PipelineResult result = p.run();
try {
result.getState();
result.waitUntilFinish();
} catch (UnsupportedOperationException e) {
// do nothing
} catch (Exception e) {
e.printStackTrace();
}}发布于 2018-10-26 20:31:53
https://medium.com/@baranitharan/the-textio-write-1be1c07fbef0数据流中的TextIO.Write现在具有向数据中添加标题行的withHeader函数。此函数是在verison 1.7.0中添加的。
因此,您可以向csv添加如下标题:
TextIO.Write.named("WriteToText")
.to("/path/to/the/file")
.withHeader("col_name1,col_name2,col_name3,col_name4")
.withSuffix(".csv"));withHeader函数在标题行的末尾自动添加换行符。
https://stackoverflow.com/questions/50855647
复制相似问题