首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >框架适用于大规模的分布式调度任务队列。

框架适用于大规模的分布式调度任务队列。
EN

Stack Overflow用户
提问于 2015-01-29 15:09:29
回答 1查看 1.3K关注 0票数 3

我需要构建一个系统,这个系统至少需要10万个工作岗位,创建一天的/scheduled。我查看了多个任务队列,但是对调度的支持不多,比如以分布式方式调度,一个好的解决方案似乎是芹菜(我是一个java的家伙,我不能使用python),

Akka (这看起来是一个很好的解决方案,可伸缩、持久等等,但是调度似乎有一个限制,比如我需要根据用户请求在一天中的不同时间安排100000个作业),如果我错了,请纠正我。

我不需要和分布式锁来调度,而是需要异步并发。请提出替代方案。打开scala/java/javascript作为一种语言。

石英是不可伸缩的(有一些严重的限制),我必须每天发送大约3亿条信息,这些信息将通过100000个工作岗位传递。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-01-30 08:52:13

如果您担心Akka调度程序的准确性,那么一旦我发现自己处于相同的情况,我会告诉您我的解决方案。对于这种场景,也许有更好的实践,但这个方案对我来说效果很好。如有任何改进和建议,我将不胜感激。不是很长时间间隔的调度工作,我创建了一个调度器,它向自己发送滴答,以便知道什么时候应该执行工作。假设您想安排一些从date A到date B的工作,并执行每个T时间单位(这可能是毫秒到几年)。然后,我的演员所做的就是调度(使用正常的Akka调度程序),在当前时间和它应该执行第一项工作的时间间隔的一半的时间上进行一次滴答。这基本上是二进制搜索。这样,演员就不会一直滴答作响,在正确的时间执行工作也会非常准确。有关更多信息,请参见下面的Java代码:

代码语言:javascript
复制
public class WorkScheduler extends UntypedActor {

    public static Props props(final Date from, final Date to, final long every, final TimeUnit unit) {
        return Props.create(new Creator<WorkScheduler>() {
            private static final long serialVersionUID = 1L;

            @Override
            public WorkScheduler create() throws Exception {
                return new WorkScheduler(from, to, every, unit);
            }
        });
    }

    // Thresholds to avoid ticking at very long (or very short) intervals 
    private static final long MIN_TICK_DELTA = 1000 // 1 sec.
    private static final long MAX_TICK_DELTA = 21600000 // 6 hours

    private class Tick extends Message {
    }

    private long from;
    private long to;
    private long dt;
    private long checkpoint;

    public WorkScheduler(Date from, Date to, long every, TimeUnit unit) {
        this.from = from.getTime();
        this.to = to.getTime();
        this.dt = unit.toMillis(every);
    }

    @Override
    public void preStart() throws Exception {
        scheduleNextTick(); // The first tick
    }

    private void scheduleNextTick() {
        long t = new Date().getTime();

        // Compute next checkpoint
        if (t < from) {
            checkpoint = from;
        } else {
            long k = (t - from) / dt;
            if ((t - from) % dt != 0) ++k;
            checkpoint = from + k * dt;
        }

        if (checkpoint > to) { // All works executed. Shutdown.
            getContext().stop(self());
        } else { // Schedule next tick
            long delta = Math.max(MIN_TICK_DELTA, Math.min((checkpoint - t) / 2, MAX_TICK_DELTA));
            getContext().system().scheduler().scheduleOnce(
                    FiniteDuration.apply(delta, TimeUnit.MILLISECONDS),
                    self(),
                    new Tick(),
                    getContext().dispatcher(),
                    null);
        }
    }

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg instanceof Tick) {
            long t = new Date().getTime();
            if (t >= checkpoint) {
                if (t >= checkpoint + dt) // Tick came too late, due to some external delay (like system restart)
                   ;

                // execute work here, preferably spawning a new actor 
                // responsible for doing the work in asynchronous fashion
            }
            scheduleNextTick();
        } else
            unhandled(msg);
    }

}

希望能有所帮助))

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28217956

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档