我需要处理一个包含、100万条记录的文件,并将其保存在redis缓存中。我本来应该用redis管道的,但我没有得到任何信息。我的问题是:Question
所以我决定使用多线程-执行器框架。我不熟悉多线程,这里是我的代码:
@Async
public void createSubscribersAsync(Subscription subscription, MultipartFile file)throws EntityNotFoundException, InterruptedException, ExecutionException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(8);
Collection<Callable<String>> callables = new ArrayList<>();
List<Subscriber> cache = new ArrayList<>();
int batchSize = defaultBatchSize.intValue();
while ((line = br.readLine()) != null) {
try {
Subscriber subscriber = createSubscriber(subscription, line);
cache.add(subscriber);
if (cache.size() >= batchSize) {
IntStream.rangeClosed(1, 8).forEach(i -> {
callables.add(createCallable(cache, subscription.getSubscriptionId()));});
}
} catch (InvalidSubscriberDataException e) {
invalidRows.add(line + ":" + e.getMessage());
invalidCount++;
}
}
List<Future<String>> taskFutureList = executorService.invokeAll(callables);
for (Future<String> future : taskFutureList) {
String value = future.get(4, TimeUnit.SECONDS);
System.out.println(String.format("TaskFuture returned value %s", value));
}
}
private Callable<String> createCallable(List<Subscriber> cache, String subscriptionId) {
return new Callable<String>() {
public String call() throws Exception {
System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
processSubscribers(cache,subscriptionId);
System.out.println(String.format("finished expensive task thread %s", Thread.currentThread().getName()));
return "Finish Thread:" + Thread.currentThread().getName();
}
};
}
private void processSubscribers(List<Subscriber> cache, String subscriptionId) {
subscriberRedisRepository.saveAll(cache);
cache.clear();
}这里的想法是,我想在一个批处理中分割一个文件,并使用一个线程保存这个批处理。我创建了由8个线程组成的池。
是实现执行器框架的正确方法吗?如果不行,你能帮帮我吗?感谢您的帮助.
发布于 2019-09-24 14:02:22
快速修改当前代码以实现以下要求:
在while循环中,一旦当前缓存超过批处理大小,就创建一个传入当前缓存的可调用的缓存。重置缓存列表,创建新列表并将其分配为缓存.
您正在创建一个可调用项列表,以便将它们作为一个批处理提交,为什么不在创建它们之后立即提交可调用项呢?这将开始将已经读取的记录写入redis,而您的主线程则继续从文件中读取。
List<Future<String>> taskFutureList = new LinkedList<Future<String>>();
while ((line = br.readLine()) != null) {
try {
Subscriber subscriber = createSubscriber(subscription, line);
cache.add(subscriber);
if (cache.size() >= batchSize) {
taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
List<Subscriber> cache = new ArrayList<>();
}
} catch (InvalidSubscriberDataException e) {
invalidRows.add(line + ":" + e.getMessage());
invalidCount++;
}
}
//submit last batch that could be < batchSize
if(!cache.isEmpty()){
taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
}您不必存储单独的可调用项列表。
https://stackoverflow.com/questions/58079900
复制相似问题