我在尝试做一些任务调度程序和工作。任务将添加到工作人员从中检索和处理任务的队列中。必须按顺序执行来自一个队列的任务。来自不同队列的任务并行执行。当工作人员完成队列中的所有任务时,他会等待一段时间并完成他的工作。必须将队列从计划中删除。
我做了点什么,但我认为这不是最好的解决办法。我认为当工人关闭和关闭频道时可能会出现问题。
我的解决方案并发安全吗?
// Schedule struct
type Schedule struct {
sync.RWMutex
queues map[int]chan Task
idle byte
}
// Scheduler pushes task to the queue
func (s *Schedule) Scheduler(t Task, i int) {
var queue chan Task
var ok bool
s.RLock()
if queue, ok = s.queues[i]; !ok {
s.RUnlock()
s.Lock()
if queue, ok = s.queues[i]; !ok {
queue = make(chan Task)
s.queues[i] = queue
go s.worker(queue, i)
}
s.Unlock()
} else {
s.RUnlock()
}
queue <- t
}
// Worker retrieves task from the queue and process
func (s *Schedule) worker(c chan Task, i int) {
timeout := time.After(s.idle * time.Second)
done := false
for !done {
select {
case task := <-c:
task.Execute()
timeout = time.After(s.idle * time.Second)
case <-timeout:
s.Lock()
close(c)
delete(s.queues, i)
s.Unlock()
done = true
default:
time.Sleep(10 * time.Millisecond)
}
}
}发布于 2019-12-08 23:12:51
default中没有必要使用Schedule.worker子句。time.Timer对象并在其上调用Timer.Reset & Timer.Stop。done变量,我们可以在完成时只使用return。func (s *Schedule) worker(c chan Task, i int) {
t := time.NewTimer(s.idle * time.Second)
for {
select {
case task := <-c:
task.Execute()
if !t.Stop() {
<-t.C
}
t.Reset(s.idle * time.Second)
case <-t.C:
s.Lock()
close(c)
delete(s.queues, i)
s.Unlock()
return
}
}
}func (s *Schedule) worker(c chan Task, i int) {
for task := range c {
task.Execute()
}
}https://codereview.stackexchange.com/questions/231817
复制相似问题