首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Java发生-在两个ConcurrentMaps上一致的线程视图之前

Java发生-在两个ConcurrentMaps上一致的线程视图之前
EN

Stack Overflow用户
提问于 2015-11-05 21:56:07
回答 2查看 159关注 0票数 1

我有一个java类来处理多线程订阅服务。通过实现可订阅接口,可以将任务提交给服务并定期执行。下面是代码的草图:

代码语言:javascript
复制
import java.util.concurrent.*;

public class Subscribtions {

    private ConcurrentMap<Subscribable, Future<?>> futures = new ConcurrentHashMap<Subscribable, Future<?>>();
    private ConcurrentMap<Subscribable, Integer> cacheFutures = new ConcurrentHashMap<Subscribable, Integer>();
    private ScheduledExecutorService threads;

    public Subscribtions() {
        threads = Executors.newScheduledThreadPool(16);
    }

    public void subscribe(Subscribable subscription) {
        Runnable runnable = getThread(subscription);
        Future<?> future = threads.scheduleAtFixedRate(runnable, subscription.getInitialDelay(), subscription.getPeriod(), TimeUnit.SECONDS);
        futures.put(subscription, future);
    }

    /*
     * Only called from controller thread
     */
    public void unsubscribe(Subscribable subscription) {
        Future<?> future = futures.remove(subscription);    //1. Might be removed by worker thread 
        if (future != null)
            future.cancel(false);
        else {
            //3. Worker-thread view     := cacheFutures.put() -> futures.remove()
            //4. Controller-thread has seen futures.remove(), but has it seen cacheFutures.put()?
        }
    }

    /*
     * Only called from worker threads
     */
    private void delay(Runnable runnable, Subscribable subscription, long delay) {
        cacheFutures.put(subscription, 0);                  //2. Which is why it is cached first
        Future<?> currentFuture = futures.remove(subscription);
        if (currentFuture != null) {
            currentFuture.cancel(false);
            Future<?> future = threads.scheduleAtFixedRate(runnable, delay, subscription.getPeriod(), TimeUnit.SECONDS);
            futures.put(subscription, future);
        }
    }

    private Runnable getThread(Subscribable subscription) {
        return new Runnable() {
            public void run() {
                //Do work...
                boolean someCondition = true;
                long someDelay = 100;
                if (someCondition) {
                    delay(this, subscription, someDelay);
                }
            }
        };
    }

    public interface Subscribable {
        long getInitialDelay();
        long getPeriod();
    }
}

因此,该课程允许:

  • 订阅新任务
  • 取消订阅现有任务
  • 延迟周期性执行的任务

订阅由外部控制线程添加/删除,但延迟仅由内部工作线程引起。例如,如果工作线程没有从上次执行中找到更新,或者例如,如果线程只需要在00.00 - 23.00之间执行,则可能发生这种情况。

我的问题是,一个工作线程可以调用delay()并从ConcurrentMap中删除它的未来,而控制器线程可以同时调用unsubscribe()。然后,如果控制器线程在工作线程进入新的未来之前检查ConcurrentMap,则unsubscribe()调用将丢失。

有一些(也许不是详尽无遗的)解决方案:

  • 使用delay()unsubscribe()方法之间的锁
  • 和上面一样,但是每次订阅一个锁。
  • (首选?)不使用锁,但在delay()方法中使用“缓存”删除期货

至于第三种解决方案,由于工作线程已经建立了在关系cacheFutures.put() -> futures.remove()之前发生的事件,而ConcurrentMap的原子性使控制器线程看到了futures.remove(),那么它是否也看到了与工作线程相同的关系?即cacheFutures.put() -> futures.remove()?或者原子性只适用于稍后传播到其他变量的更新的futures映射吗?

任何其他评论也是欢迎的,尤指。考虑使用易失性关键字。是否应该声明缓存映射是不稳定的?谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-11-05 22:43:26

每次订阅一个锁将要求您维护另一个映射,并可能因此引入额外的并发问题。我认为最好避免这样做。同样的情况也适用于缓存已删除的订阅,另外,这也增加了不必要的资源保留的风险(请注意,您需要缓存的不是Future本身,而是与它们关联的Subscribable)。

无论如何,您都需要某种同步/锁定。例如,在您的选项(3)中,您需要避免对给定订阅在delay()缓存订阅和删除订阅Future之间发生delay()。在没有某种形式的锁定的情况下,避免这种情况的唯一方法是,每个订阅只使用一个Future,从subscribe()注册时一直保持到unsubscribe()删除为止。这样做不符合延迟预定订阅的能力。

至于第三种解决方案,由于工作线程已经建立了在关系cacheFutures.put() -> futures.remove()之前发生的事件,而ConcurrentMap的原子性使控制器线程看到futures.remove(),那么它是否也看到了与工作线程之前的关系相同的情况?

发生--以前是程序执行过程中动作之间的关系。它不特定于任何一个线程对执行的视图。

或者原子性只适用于期货地图,其他变量的更新将在以后传播?

控制器线程将始终在同一调用执行的cacheFutures.put()之前看到delay()调用所执行的futures.remove()。不过,我觉得这帮不了你。

是否应该声明缓存映射是不稳定的?

不是的。这没有任何用处,因为尽管映射的内容发生了变化,但映射本身始终是相同的对象,而且对它的引用不会改变。

您可以考虑让subscribe()delay()unsubscribe()分别在Subscribable上同步。这并不是我所理解的每个订阅都有一个锁的意思,但这是相似的。这将避免需要单独的数据结构来维护这些锁。如果您希望避免显式同步,也可以将锁定方法构建到Subscribable接口中。

票数 0
EN

Stack Overflow用户

发布于 2015-11-06 01:23:48

您有一个ConcurrentMap,但您没有使用它。考虑一下以下几点:

代码语言:javascript
复制
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

final class SO33555545
{

  public static void main(String... argv)
    throws InterruptedException
  {
    ScheduledExecutorService workers = Executors.newScheduledThreadPool(16);
    Subscriptions sub = new Subscriptions(workers);
    sub.subscribe(() -> System.out.println("Message received: A"));
    sub.subscribe(() -> System.out.println("Message received: B"));
    Thread.sleep(TimeUnit.SECONDS.toMillis(30));
    workers.shutdown();
  }

}

final class Subscriptions
{

  private final ConcurrentMap<Subscribable, Task> tasks = new ConcurrentHashMap<>();

  private final ScheduledExecutorService workers;

  public Subscriptions(ScheduledExecutorService workers)
  {
    this.workers = workers;
  }

  void subscribe(Subscribable sub)
  {
    Task task = new Task(sub);
    Task current = tasks.putIfAbsent(sub, task);
    if (current != null)
      throw new IllegalStateException("Already subscribed");
    task.activate();
  }

  private Future<?> schedule(Subscribable sub)
  {
    Runnable task = () -> {
      sub.invoke();
      if (Math.random() < 0.25) {
        System.out.println("Delaying...");
        delay(sub, 5);
      }
    };
    return workers.scheduleAtFixedRate(task, sub.getPeriod(), sub.getPeriod(), TimeUnit.SECONDS);
  }

  void unsubscribe(Subscribable sub)
  {
    Task task = tasks.remove(sub);
    if (task != null)
      task.cancel();
  }

  private void delay(Subscribable sub, long delay)
  {
    Task task = new Task(sub);
    Task obsolete = tasks.replace(sub, task);
    if (obsolete != null) {
      obsolete.cancel();
      task.activate();
    }
  }

  private final class Task
  {

    private final FutureTask<Future<?>> future;

    Task(Subscribable sub)
    {
      this.future = new FutureTask<>(() -> schedule(sub));
    }

    void activate()
    {
      future.run();
    }

    void cancel()
    {
      boolean interrupted = false;
      while (true) {
        try {
          future.get().cancel(false);
          break;
        }
        catch (ExecutionException ignore) {
          ignore.printStackTrace(); /* Cancellation is unnecessary. */
          break;
        }
        catch (InterruptedException ex) {
          interrupted = true; /* Keep waiting... */
        }
      }
      if (interrupted)
        Thread.currentThread().interrupt(); /* Reset interrupt state. */
    }

  }

}

@FunctionalInterface
interface Subscribable
{

  default long getPeriod()
  {
    return 4;
  }

  void invoke();

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33555545

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档