首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >处理100万条记录的Executor框架

处理100万条记录的Executor框架
EN

Stack Overflow用户
提问于 2019-09-24 12:02:52
回答 1查看 635关注 0票数 1

我需要处理一个包含、100万条记录的文件,并将其保存在redis缓存中。我本来应该用redis管道的,但我没有得到任何信息。我的问题是:Question

所以我决定使用多线程-执行器框架。我不熟悉多线程,这里是我的代码:

代码语言:javascript
复制
@Async
    public void createSubscribersAsync(Subscription subscription, MultipartFile file)throws EntityNotFoundException, InterruptedException, ExecutionException, TimeoutException {

        ExecutorService executorService = Executors.newFixedThreadPool(8);
        Collection<Callable<String>> callables = new ArrayList<>();


        List<Subscriber> cache = new ArrayList<>();
        int batchSize = defaultBatchSize.intValue();

        while ((line = br.readLine()) != null) {
            try {
                Subscriber subscriber = createSubscriber(subscription, line);
                cache.add(subscriber);
                if (cache.size() >= batchSize) {
                    IntStream.rangeClosed(1, 8).forEach(i -> {
                    callables.add(createCallable(cache, subscription.getSubscriptionId()));});
                }
            } catch (InvalidSubscriberDataException e) {
                invalidRows.add(line + ":" + e.getMessage());
                invalidCount++;
            }
        }
        List<Future<String>> taskFutureList = executorService.invokeAll(callables);
        for (Future<String> future : taskFutureList) {
            String value = future.get(4, TimeUnit.SECONDS);
            System.out.println(String.format("TaskFuture returned value %s", value));
        }
    }

    private Callable<String> createCallable(List<Subscriber> cache, String subscriptionId) {

        return new Callable<String>() {

            public String call() throws Exception {

                System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
                processSubscribers(cache,subscriptionId);
                System.out.println(String.format("finished expensive task thread %s", Thread.currentThread().getName()));
                return "Finish Thread:" + Thread.currentThread().getName();
            }
        };
    }

    private void processSubscribers(List<Subscriber> cache, String subscriptionId) {
        subscriberRedisRepository.saveAll(cache);
        cache.clear();
    }

这里的想法是,我想在一个批处理中分割一个文件,并使用一个线程保存这个批处理。我创建了由8个线程组成的池。

是实现执行器框架的正确方法吗?如果不行,你能帮帮我吗?感谢您的帮助.

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-24 14:02:22

快速修改当前代码以实现以下要求:

在while循环中,一旦当前缓存超过批处理大小,就创建一个传入当前缓存的可调用的缓存。重置缓存列表,创建新列表并将其分配为缓存.

您正在创建一个可调用项列表,以便将它们作为一个批处理提交,为什么不在创建它们之后立即提交可调用项呢?这将开始将已经读取的记录写入redis,而您的主线程则继续从文件中读取。

代码语言:javascript
复制
 List<Future<String>> taskFutureList = new LinkedList<Future<String>>();
 while ((line = br.readLine()) != null) {
    try {
        Subscriber subscriber = createSubscriber(subscription, line);
        cache.add(subscriber);
        if (cache.size() >= batchSize) {
                    taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
            List<Subscriber> cache = new ArrayList<>();
        }
     } catch (InvalidSubscriberDataException e) {
        invalidRows.add(line + ":" + e.getMessage());
        invalidCount++;
    }
}
//submit last batch that could be < batchSize
if(!cache.isEmpty()){ 
           taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
}

您不必存储单独的可调用项列表。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58079900

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档