首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Goroutines,channels语句

Goroutines,channels语句
EN

Stack Overflow用户
提问于 2017-07-24 08:31:46
回答 1查看 160关注 0票数 0

我在构建我的goroutines和channel时遇到了麻烦。我的select语句在所有goroutines结束之前一直退出,我知道问题出在我发送done信号的位置。我应该在哪里发送完成信号。

代码语言:javascript
复制
func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool, wg *sync.WaitGroup) {
    var results ProcessResults
    defer wg.Done()
    log.Info("Starting . . .")
    start := time.Now()

    for {
        select {
        case lead := <-ok:
            results.BackFill = append(results.BackFill, lead.Lead)
        case err := <-err:
            results.BadLeads = append(results.BadLeads, err)
        case <-quit:
            if verbose {
                log.Info("Logging errors from unprocessed leads . . .")
                logBl(results.BadLeads)
            }
            log.WithFields(log.Fields{
                "time-elapsed":                time.Since(start),
                "number-of-unprocessed-leads": len(results.BadLeads),
                "number-of-backfilled-leads":  len(results.BackFill),
            }).Info("Done")
            return
        }
    }
}

//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    var wg sync.WaitGroup
    gl, bl, d := getChans()
    for i, lead := range leads {
        done := false
        if len(leads)-1 == i {
            done = true
        }
        wg.Add(1)
        go func(lead Lead, done bool, wg *sync.WaitGroup) {
            ProcessLead(lead, gl, bl, d, done, wg)
        }(lead, done, &wg)

    }
    startWorker(gl, bl, d, verbose, &wg)
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, c3 chan int, done bool, wg *sync.WaitGroup) {
    defer wg.Done()
    var payloads []Payload
    for _, p := range lead.Payload {
        decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
        if err != nil {
            c2 <- LeadResErr{lead, err.Error()}
        }
        var decMetadata Metadata
        if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
            goodMetadata, err := FixMDStr(string(decMDStr))
            if err != nil {
                c2 <- LeadResErr{lead, err.Error()}
            }
            p.MetaData = goodMetadata

            payloads = append(payloads, p)
        }
    }

    lead.Payload = payloads
    c1 <- LeadRes{lead}
    if done {
        c3 <- 0
    }
}
EN

回答 1

Stack Overflow用户

发布于 2017-07-25 01:10:22

首先评论一下我在代码中看到的主要问题:

您将一个done变量传递给最后一个ProcessLead调用,然后在ProcessLead中使用该变量通过quit通道停止您的工作进程。这样做的问题是,“最后一个”ProcessLead调用可能会在其他ProcessLead调用并行执行之前完成。

第一次改进

把你的问题想象成一条管道。您有3个步骤:

检查所有销售线索并为每个销售线索启动一个例程,例程处理它们的lead

  • collecting

在步骤2中展开之后,最简单的同步方法是WaitGroup。如前所述,您没有调用同步,如果您要调用同步,您当前将创建一个与收集例程相关的死锁。您需要另一个goroutine将同步与收集例程分开,这样才能正常工作。

这看起来会是什么样子(sry删除了一些代码,这样我就可以更好地了解其结构):

代码语言:javascript
复制
//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    gl, bl, d := make(chan LeadRes), make(chan LeadResErr), make(chan int)
    // additional goroutine with wg.Wait() and closing the quit channel
    go func(d chan int) {
        var wg sync.WaitGroup
        for i, lead := range leads {
            wg.Add(1)
            go func(lead Lead, wg *sync.WaitGroup) {
                ProcessLead(lead, gl, bl, wg)
            }(lead, &wg)
        }
        wg.Wait()
        // stop routine after all other routines are done
        // if your channels have buffers you might want make sure there is nothing in the buffer before closing
        close(d) // you can simply close a quit channel. just make sure to only close it once
    }(d)

    // now startworker is running parallel to wg.Wait() and close(d)
    startWorker(gl, bl, d, verbose)
}

func startWorker(ok chan LeadRes, err chan LeadResErr, quit chan int, verbose bool) {
    for {
        select {
        case lead := <-ok:
            fmt.Println(lead)
        case err := <-err:
            fmt.Println(err)
        case <-quit:
            return
        }
    }
}

//ProcessLead . . .
func ProcessLead(lead Lead, c1 chan LeadRes, c2 chan LeadResErr, wg *sync.WaitGroup) {
    defer wg.Done()
    var payloads []Payload
    for _, p := range lead.Payload {
        decMDStr, err := base64.StdEncoding.DecodeString(p.MetaData)
        if err != nil {
            c2 <- LeadResErr{lead, err.Error()}
        }
        var decMetadata Metadata
        if err := json.Unmarshal(decMDStr, &decMetadata); err != nil {
            goodMetadata, err := FixMDStr(string(decMDStr))
            if err != nil {
                c2 <- LeadResErr{lead, err.Error()}
            }
            p.MetaData = goodMetadata

            payloads = append(payloads, p)
        }
    }

    lead.Payload = payloads
    c1 <- LeadRes{lead}
}

建议的解决方案

正如评论中提到的,如果你有缓冲的通道,你可能会遇到麻烦。复杂之处在于您有两个输出通道(用于Lead和LeadErr)。您可以使用以下结构来避免这种情况:

代码语言:javascript
复制
//BackFillParallel . . .
func BackFillParallel(leads []Lead, verbose bool) {
    gl, bl := make(chan LeadRes), make(chan LeadResErr)

    // one goroutine that blocks until all ProcessLead functions are done
    go func(gl chan LeadRes, bl chan LeadResErr) {
        var wg sync.WaitGroup
        for _, lead := range leads {
            wg.Add(1)
            go func(lead Lead, wg *sync.WaitGroup) {
                ProcessLead(lead, gl, bl, wg)
            }(lead, &wg)
        }
        wg.Wait()
    }(gl, bl)

    // main routine blocks until all results and errors are collected
    var wg sync.WaitGroup
    res, errs := []LeadRes{}, []LeadResErr{}
    wg.Add(2) // add 2 for resCollector and errCollector
    go resCollector(&wg, gl, res)
    go errCollector(&wg, bl, errs)
    wg.Wait()

    fmt.Println(res, errs) // in these two variables you will have the results.
}

func resCollector(wg *sync.WaitGroup, ok chan LeadRes, res []LeadRes) {
    defer wg.Done()
    for lead := range ok {
        res = append(res, lead)
    }
}

func errCollector(wg *sync.WaitGroup, ok chan LeadResErr, res []LeadResErr) {
    defer wg.Done()
    for err := range ok {
        res = append(res, err)
    }
}

// ProcessLead function as in "First improvement"
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45271129

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档