我在Spring Batch中有一个需求,在那里我有一个包含数千条记录的文件,其中一个排序的order.The关键字段是产品代码。该文件可能有多个相同产品的记录,列表要求是我必须将具有相同产品代码的记录分组到一个集合(即列表)中,然后将它们发送到一个方法,即validateProductCodes( code.The prodCodeList)。
谢谢
发布于 2015-08-11 21:32:56
在您的问题中有两个问题:第一,您想知道如何将项目分组在一起,第二,它们是如何处理的。
为了对它们进行分组,您可以按照Luca的建议创建一个组阅读器,或者类似于:
public class GroupReader<I> implements ItemReader<List<I>>{
private SingleItemPeekableItemReader<I> reader;
private ItemReader<I> peekReaderDelegate;
public void setReader(ItemReader<I> reader) {
peekReaderDelegate = reader;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null");
this.reader= new SingleItemPeekableItemReader<I>();
this.reader.setDelegate(delegateReader);
}
@Override
public List<I> read() throws Exception {
State state = State.NEW;
List<I> group = null;
I item = null;
while (state != State.COMPLETE) {
item = reader.read();
switch (state) {
case NEW: {
if (item == null) {
// end reached
state = State.COMPLETE;
break;
}
group = new ArrayList<I>();
group.add(item);
state = State.READING;
I nextItem = reader.peek();
if (isItAKeyChange(item, nextItem)) {
state = State.COMPLETE;
}
break;
}
case READING: {
group.add(item);
// peek and check if there the peeked entry has a new date
I nextItem = peekEntry();
if (isItAKeyChange(item, nextItem)) {
state = State.COMPLETE;
}
break;
}
default: {
throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state");
}
}
}
return group;
}
}对于每个键,该读取器将返回一个列表,其中包含与该键匹配的所有元素。因此,分组是直接在阅读器中完成的。正如您所描述的,您不能使用处理器做到这一点。
你的第二个关于多线程的问题。现在,使用一个步骤并不一定意味着该步骤是由多个线程处理的。
为此,您需要设置一个AsyncTaskExecutor,并且必须设置限制。
但是如果你这样做,你的阅读器必须是线程安全的,否则你的分组将不起作用。只需将上面的read方法定义为synchronized,就可以做到这一点。
另一种方法是编写一个小的SynchronizedWrapperReader,就像这个问题中建议的那样:Parellel Processing Spring Batch StaxEventItemReader
请注意,根据您要写入的目标,您可能还需要同步编写器,如果需要,还需要对结果进行重新排序。
https://stackoverflow.com/questions/31930775
复制相似问题