链接到游乐场https://go.dev/play/p/ctQDpDW6pui这段代码是基于这个线程这里中的建议和对话
请检查代码,帮助找出我错过的任何死锁,并建议一个更干净的方法来解决我试图解决的一些死锁。
package operation
import (
"context"
"fmt"
"sync"
)
func mapperreducer_so() {
a1 := []int{1, 2, 3, 4, 5}
a2 := []int{5, 4, 3, 1, 1}
a3 := []int{6, 7, 8, 9}
a4 := []int{1, 2, 3, 4, 5}
a5 := []int{5, 4, 3, 1, 1}
a6 := []int{6, 7, 18, 9}
arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}
ctx, cancel := context.WithCancel(context.Background())
ch1 := read(ctx, arrayOfArray)
messageCh := make(chan int)
errCh := make(chan error)
producerWg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
producerWg.Add(1)
producer(ctx, producerWg, ch1, messageCh, errCh)
}
consumerWg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
consumerWg.Add(1)
consumer(ctx, consumerWg, messageCh, errCh)
}
firstError := handleAllErrors(ctx, cancel, errCh)
producerWg.Wait()
close(messageCh)
consumerWg.Wait()
close(errCh)
fmt.Println(<-firstError)
}
func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
ch := make(chan []int)
go func() {
defer close(ch)
for i := 0; i < len(arrayOfArray); i++ {
select {
case <-ctx.Done():
return
case ch <- arrayOfArray[i]:
}
}
}()
return ch
}
func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case arr, ok := <-in:
if !ok {
return
}
for i := 0; i < len(arr); i++ {
// simulating an error.
//if arr[i] == 10 {
// errCh <- fmt.Errorf("producer interrupted")
//}
select {
case <-ctx.Done():
return
case messageCh <- 2 * arr[i]:
}
}
}
}
}()
}
func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
go func() {
wg.Done()
for {
select {
case <-ctx.Done():
return
case n, ok := <-messageCh:
if !ok {
return
}
fmt.Println("consumed: ", n)
// simulating erros
//if n == 10 {
// errCh <- fmt.Errorf("output error during write")
//}
}
}
}()
}
func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
firstErrCh := make(chan error, 1)
isFirstError := true
go func() {
defer close(firstErrCh)
for err := range errCh {
select {
case <-ctx.Done():
default:
cancel()
}
if isFirstError {
firstErrCh <- err
isFirstError = !isFirstError
}
}
}()
return firstErrCh
}发布于 2023-01-22 23:44:28
我有几个建议给你。
第一种方法是将mapperreducer_so重命名为TestProducerConsumer,并将其签名从func()更改为func(t *testing.T)。这将把它转换为可以使用go test运行的测试函数。更好的是,您可以将-race标志交给go test来打开竞争检测器,这将告诉您测试的代码中是否存在死锁问题。
如果您还将TestProducerConsumer放入带有_test.go后缀的文件中,则在任何非测试构建中都会忽略它。
接下来,当您使用context.WithCancel创建上下文时,您应该立即包含一个defer cancel()行,以确保无论函数如何离开,上下文都被取消。一个例外情况是,如果您希望在函数的生存期之后保持上下文,但这是很少见的,并且可能表明需要重新构造代码。(如果你担心cancel会被多次调用,那就别担心--第二次和以后的几次都是无害的。更糟糕的情况是,它称其为零次。)
接下来,如果您想要在任何一个goroutine中的错误导致其他goroutines被取消,那么您需要一个errgroup.Group而不是一个sync.WaitGroup。这将消除你在这里所写的很多逻辑。
接下来,我建议使用单个errgroup.Group来包含所有生产者和所有消费者。似乎没有任何理由选择单独的小组。
接下来,这是一个很好的做法,让调用者来启动goroutines。我建议重写producer和consumer,只包含它们现在包含的延迟funcs的主体,并将调用者更改为调用go producer(...)和go consumer(...)。(如果像我前面建议的那样使用errgroup.Group,情况会有所不同。)
如果您使用errgroup.Group,您可以并且应该重写producer和consumer,以返回错误,而不是在通道上发送错误,并且在ctx.Done()情况下应该包括ctx.Err()。
当然,位于wg.Done()顶部的consumer应该是defer wg.Done()。
最后,请允许我谦逊地向您推荐Github.com/bobg/go-generics/并行软件包。
https://codereview.stackexchange.com/questions/282697
复制相似问题