首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >生产者(映射者)和消费者(减速器)的并发问题

生产者(映射者)和消费者(减速器)的并发问题
EN

Code Review用户
提问于 2023-01-19 05:51:43
回答 1查看 110关注 0票数 0

链接到游乐场https://go.dev/play/p/ctQDpDW6pui这段代码是基于这个线程这里中的建议和对话

体系结构:

  • read方法创建与生产者共享的通道。Read方法添加到通道中,而生产者从它读取。
  • 生产者和消费者共享一个频道。生产者添加数据,消费者消费。
  • 生产者和消费者使用错误通道将错误传递给主要方法。
  • 一个很难的要求是,如果任何工人--生产者或消费者--遇到错误,那么所有工人都应该停止工作。这导致我使用上下文来取消生产者和消费者。
  • 生产者和消费者通过错误通道传递错误。
  • 一个名为handleAllErrors的go例程消耗errorChannel,并使用上下文的cancel调用退出并关闭剩余的生产者和消费者。

发行

  • 我一直在努力克服死锁的挑战,据我所知,所有的死锁问题都已经解决了,但是代码似乎有了一些改进。
  • 我特别关注的是,当消费者被打断时,排完队的比赛条件。
  • 例:如果生产者还在计算,而所有的消费者都被打断,代码可能会阻塞/死锁。
  • 例:如果在读者还在给它喂食的时候,所有的制片人都被打断了,它可能会阻塞。

请检查代码,帮助找出我错过的任何死锁,并建议一个更干净的方法来解决我试图解决的一些死锁。

代码语言:javascript
复制
package operation

import (
    "context"
    "fmt"
    "sync"
)

func mapperreducer_so() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}
EN

回答 1

Code Review用户

回答已采纳

发布于 2023-01-22 23:44:28

我有几个建议给你。

第一种方法是将mapperreducer_so重命名为TestProducerConsumer,并将其签名从func()更改为func(t *testing.T)。这将把它转换为可以使用go test运行的测试函数。更好的是,您可以将-race标志交给go test来打开竞争检测器,这将告诉您测试的代码中是否存在死锁问题。

如果您还将TestProducerConsumer放入带有_test.go后缀的文件中,则在任何非测试构建中都会忽略它。

接下来,当您使用context.WithCancel创建上下文时,您应该立即包含一个defer cancel()行,以确保无论函数如何离开,上下文都被取消。一个例外情况是,如果您希望在函数的生存期之后保持上下文,但这是很少见的,并且可能表明需要重新构造代码。(如果你担心cancel会被多次调用,那就别担心--第二次和以后的几次都是无害的。更糟糕的情况是,它称其为零次。)

接下来,如果您想要在任何一个goroutine中的错误导致其他goroutines被取消,那么您需要一个errgroup.Group而不是一个sync.WaitGroup。这将消除你在这里所写的很多逻辑。

接下来,我建议使用单个errgroup.Group来包含所有生产者和所有消费者。似乎没有任何理由选择单独的小组。

接下来,这是一个很好的做法,让调用者来启动goroutines。我建议重写producerconsumer,只包含它们现在包含的延迟funcs的主体,并将调用者更改为调用go producer(...)go consumer(...)。(如果像我前面建议的那样使用errgroup.Group,情况会有所不同。)

如果您使用errgroup.Group,您可以并且应该重写producerconsumer,以返回错误,而不是在通道上发送错误,并且在ctx.Done()情况下应该包括ctx.Err()

当然,位于wg.Done()顶部的consumer应该是defer wg.Done()

最后,请允许我谦逊地向您推荐Github.com/bobg/go-generics/并行软件包。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/282697

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档