我正在努力学习如何使用通道来为我的其他项目设置队列。我的另一个项目基本上是对数据库行进行排队,然后使用行中的详细信息在数据库上进行数字处理。
我不希望同一行同时在工人中进行处理,因此它需要检查工作人员当前是否正在处理特定的行ID,如果是,则等待它完成。如果它不是相同的行ID,它可以异步运行,但我也希望限制可以同时运行的异步工作人员的数量。在下面的代码中,我正在尝试将它限制为三个工人。
我现在拥有的是:
package main
import (
"log"
"strconv"
"time"
)
// RowInfo holds the job info
type RowInfo struct {
id int
}
// WorkerCount holds how many workers are currently running
var WorkerCount int
// WorkerLocked specifies whether a row ID is currently processing by a worker
var WorkerLocked map[string]bool
// Process the RowInfo
func worker(row RowInfo) {
rowID := strconv.Itoa(row.id)
WorkerCount++
WorkerLocked[rowID] = true
time.Sleep(1 * time.Second)
log.Printf("ID rcvd: %d", row.id)
WorkerLocked[rowID] = false
WorkerCount--
}
// waiter will check if the row is already processing in a worker
// Block until it finishes completion, then dispatch
func waiter(row RowInfo) {
rowID := strconv.Itoa(row.id)
for WorkerLocked[rowID] == true {
time.Sleep(1 * time.Second)
}
go worker(row)
}
func main() {
jobsQueue := make(chan RowInfo, 10)
WorkerLocked = make(map[string]bool)
// Dispatcher waits for jobs on the channel and dispatches to waiter
go func() {
// Wait for a job
for {
// Only have a max of 3 workers running asynch at a time
for WorkerCount > 3 {
time.Sleep(1 * time.Second)
}
job := <-jobsQueue
go waiter(job)
}
}()
// Test the queue, send some data
for i := 0; i < 12; i++ {
r := RowInfo{
id: i,
}
jobsQueue <- r
}
// Prevent exit!
for {
time.Sleep(1 * time.Second)
}
}我得到了这个错误,但这是一个间歇性的问题,因为有时当我运行它时,它看起来是有效的。有比赛条件吗?:
go run main.go
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x8 pc=0x4565e7]
goroutine 37 [running]:
main.worker(0x5)
/home/piiz/go/src/github.com/zzz/asynch/main.go:25 +0x94
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 1 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main()
/home/piiz/go/src/github.com/zzz/asynch/main.go:73 +0xf8
goroutine 5 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main.func1(0xc82008c000)
/home/piiz/go/src/github.com/zzz/asynch/main.go:55 +0x2d
created by main.main
/home/piiz/go/src/github.com/zzz/asynch/main.go:61 +0xa0
goroutine 35 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x2)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 36 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x4)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 34 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x1)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 12 [runnable]:
runtime.goexit1()
/usr/local/go/src/runtime/proc1.go:1732
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1697 +0x6
created by main.main.func1
/home/piiz/go/src/github.com/zzz/asynch/main.go:59 +0x8c
goroutine 19 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x8)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 20 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x0)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 16 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x9)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 33 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x3)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 18 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x7)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 22 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0xa)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 49 [runnable]:
main.worker(0x6)
/home/piiz/go/src/github.com/zzz/asynch/main.go:21
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
exit status 2无论如何,我还在学习,所以如果你看我的代码,然后说“到底是什么”,我不会感到惊讶的:)也许我完全错误地处理了这个问题。谢谢。
发布于 2016-02-11 02:40:36
如果要使用WorkerLocked映射,则需要使用sync包保护对它的访问。您还需要以同样的方式保护WorkerCount (或者使用原子操作)。这样做也会使睡眠变得不必要(使用条件变量)。
更好的做法是,让3名(或多少名)工作人员等待行使用通道。然后,将行分发给各个工作人员,以便特定的工作人员总是对特定的行进行处理(例如,使用row.id %3来确定要将行发送到哪个工人/通道)。
https://stackoverflow.com/questions/35329276
复制相似问题