我正在尝试使用Apache (2.25.3版)反应性流与Spring相结合来读取大型csv文件,并使用Bindy对行进行解组。这在某种意义上是“工作的”,因为应用程序运行并检测文件的出现,但是我在流中只看到文件的第一行。它似乎与Bindy相关,因为如果我将解组从等式中取出来,我就可以得到流中csv文件的所有行。我已经简化了这个问题,以便在这里演示。我正在使用Spring来公开结果发布者。
我的骆驼路线如下:
import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@RequiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
private final CamelReactiveStreamsService camelRs;
@Override
public void configure() {
var bindy = new BindyCsvDataFormat(LineItem.class);
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.unmarshal(bindy)
.to("reactive-streams:lineItems");
}
public Flux<LineItem> getLineItemFlux() {
Publisher<LineItem> lineItems = camelRs.fromStream("lineItems", LineItem.class);
return Flux.from(lineItems);
}
}宾迪班:
@ToString
@Getter
@CsvRecord(separator = ";", skipFirstLine = true, skipField =true)
public class LineItem {
@DataField(pos = 2)
private String description;
}以及公开Flux的端点:
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
return lineItemFlux;
}所以当我现在做卷发时:
curl localhost:8080/lineItems我只返回第一行,而当我移除".unmarshal(bind)“行(并将流重构为String而不是LineItem)时,我会得到csv文件的所有元素。
因此,我想我不会在反应性流上下文中使用Bindy更正。我遵循这个骆驼文献并试图重写我的路线如下:
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.to("reactive-streams:rawLines");
from("reactive-streams:rawLines")
.unmarshal(bindy)
.to("reactive-streams:lineItems");它显示路线已正确启动:
2021-01-04 10:13:26.798 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Total 2 routes, of which 2 are started但是,我得到了一个异常,说明“流没有活动订阅”:
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport ] [ 9]
[route1 ] [to1 ] [reactive-streams:rawLines ] [ 5]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: The stream has no active subscriptions
at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]有没有人知道我如何将Bindy与反应流结合起来使用呢?谢谢!
编辑
在burki发布了一篇非常有帮助的文章之后,我能够修复我的代码。因此,路由定义更改为以下内容。正如您所看到的,我删除了解封送处理步骤,因此它只是在文件系统到达时从文件系统中提取文件,并将它们放入一个反应性流中:
@Override
public void configure() {
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.to("reactive-streams:extractedFile");
}然后将文件流公开为Flux:
public Flux<File> getFileFlux() {
return Flux.from(camelRs.fromStream("extractedFile", File.class));
}解析CSV的代码如下(如burki所建议的那样使用OpenCSV,但使用API的另一部分):
private Flux<LineItem> readLineItems() {
return fileFlux
.flatMap(message -> Flux.using(
() -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
.withSeparator(';')
.withType(LineItem.class)
.build()
.stream(),
Flux::fromStream,
BaseStream::close)
);
}
private FileReader createFileReader(File file) {
System.out.println("Reading file from: " + file.getAbsolutePath());
try {
return new FileReader(file);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}现在,您可以将这个结果的Flux公开为端点:
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
return readLineItems();
}现在,当您做卷曲时,就像我上面所做的那样,您可以从csv获得完整的解组LineItems。
无论这是否真的将整个文件加载到内存中,我仍然有一个待做的事情。我不这么认为,我想我只得到一个指向文件的指针,然后我流到OpenCSV bean,但我需要验证这一点,可能是我现在首先将整个文件读取到内存中,然后流到内存中,这样就会达到这个目的。
发布于 2021-01-06 06:43:09
我猜文件使用者只是将整个文件的传递到解编组步骤。
因此,如果将文件使用者的结果解封为LineItem,则将整个文件内容“减少”为第一行。
相反,如果删除解组,则会得到整个文件内容。但是,文件使用者可能在将整个文件传递到之前将其加载到内存中。
但是读完整的文件并不是你想要的。要逐行读取CSV文件,需要在流模式下拆分文件。
from("file:...")
.split(body().tokenize(LINE_FEED)).streaming()
.to("direct:processLine") 像这样,分离器将每一行发送到路由direct:processLine进行进一步处理。
在这个场景中,我面临的问题是解析单个CSV行。大多数CSV库的设计目的是读取和解析整个文件,而不是一行。
但是,相当老的OpenCSV库有一个带有parseLine(String csvLine)方法的CSVParser。所以我用它来解析一个“完全分离”的单个CSV行。
https://stackoverflow.com/questions/65560224
复制相似问题