首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache反应流只读取Bindy第一行

Apache反应流只读取Bindy第一行
EN

Stack Overflow用户
提问于 2021-01-04 09:18:22
回答 1查看 708关注 0票数 1

我正在尝试使用Apache (2.25.3版)反应性流与Spring相结合来读取大型csv文件,并使用Bindy对行进行解组。这在某种意义上是“工作的”,因为应用程序运行并检测文件的出现,但是我在流中只看到文件的第一行。它似乎与Bindy相关,因为如果我将解组从等式中取出来,我就可以得到流中csv文件的所有行。我已经简化了这个问题,以便在这里演示。我正在使用Spring来公开结果发布者。

我的骆驼路线如下:

代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@RequiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
    private final CamelReactiveStreamsService camelRs;

    @Override
    public void configure() {
        var bindy = new BindyCsvDataFormat(LineItem.class);

        from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
                .unmarshal(bindy)
                .to("reactive-streams:lineItems");
    }

    public Flux<LineItem> getLineItemFlux() {
        Publisher<LineItem> lineItems = camelRs.fromStream("lineItems", LineItem.class);

        return Flux.from(lineItems);
    }
}

宾迪班:

代码语言:javascript
复制
@ToString
@Getter
@CsvRecord(separator = ";", skipFirstLine = true, skipField =true)
public class LineItem {
    @DataField(pos = 2)
    private String description;
}

以及公开Flux的端点:

代码语言:javascript
复制
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return lineItemFlux;
}

所以当我现在做卷发时:

代码语言:javascript
复制
curl localhost:8080/lineItems

我只返回第一行,而当我移除".unmarshal(bind)“行(并将流重构为String而不是LineItem)时,我会得到csv文件的所有元素。

因此,我想我不会在反应性流上下文中使用Bindy更正。我遵循这个骆驼文献并试图重写我的路线如下:

代码语言:javascript
复制
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
        .to("reactive-streams:rawLines");

from("reactive-streams:rawLines")
        .unmarshal(bindy)
        .to("reactive-streams:lineItems");

它显示路线已正确启动:

代码语言:javascript
复制
2021-01-04 10:13:26.798  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Total 2 routes, of which 2 are started

但是,我得到了一个异常,说明“流没有活动订阅”:

代码语言:javascript
复制
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport      ] [         9]
[route1            ] [to1               ] [reactive-streams:rawLines                                                     ] [         5]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalStateException: The stream has no active subscriptions
    at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]

有没有人知道我如何将Bindy与反应流结合起来使用呢?谢谢!

编辑

在burki发布了一篇非常有帮助的文章之后,我能够修复我的代码。因此,路由定义更改为以下内容。正如您所看到的,我删除了解封送处理步骤,因此它只是在文件系统到达时从文件系统中提取文件,并将它们放入一个反应性流中:

代码语言:javascript
复制
@Override
public void configure() {
    from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
            .to("reactive-streams:extractedFile");
}

然后将文件流公开为Flux:

代码语言:javascript
复制
public Flux<File> getFileFlux() {
    return Flux.from(camelRs.fromStream("extractedFile", File.class));
}

解析CSV的代码如下(如burki所建议的那样使用OpenCSV,但使用API的另一部分):

代码语言:javascript
复制
private Flux<LineItem> readLineItems() {
    return fileFlux
            .flatMap(message -> Flux.using(
                    () -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
                            .withSeparator(';')
                            .withType(LineItem.class)
                            .build()
                            .stream(),
                    Flux::fromStream,
                    BaseStream::close)
            );
}

private FileReader createFileReader(File file) {
    System.out.println("Reading file from: " + file.getAbsolutePath());
    try {
        return new FileReader(file);
    } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
    }
}

现在,您可以将这个结果的Flux公开为端点:

代码语言:javascript
复制
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return readLineItems();
}

现在,当您做卷曲时,就像我上面所做的那样,您可以从csv获得完整的解组LineItems。

无论这是否真的将整个文件加载到内存中,我仍然有一个待做的事情。我不这么认为,我想我只得到一个指向文件的指针,然后我流到OpenCSV bean,但我需要验证这一点,可能是我现在首先将整个文件读取到内存中,然后流到内存中,这样就会达到这个目的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-06 06:43:09

我猜文件使用者只是将整个文件的传递到解编组步骤。

因此,如果将文件使用者的结果解封为LineItem,则将整个文件内容“减少”为第一行

相反,如果删除解组,则会得到整个文件内容。但是,文件使用者可能在将整个文件传递到之前将其加载到内存中。

但是读完整的文件并不是你想要的。要逐行读取CSV文件,需要在流模式下拆分文件。

代码语言:javascript
复制
from("file:...")
    .split(body().tokenize(LINE_FEED)).streaming()
    .to("direct:processLine") 

像这样,分离器将每一行发送到路由direct:processLine进行进一步处理。

在这个场景中,我面临的问题是解析单个CSV行。大多数CSV库的设计目的是读取和解析整个文件,而不是一行。

但是,相当老的OpenCSV库有一个带有parseLine(String csvLine)方法的CSVParser。所以我用它来解析一个“完全分离”的单个CSV行。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65560224

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档