首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将数组处理分离为goroutines?

如何将数组处理分离为goroutines?
EN

Stack Overflow用户
提问于 2017-03-31 14:50:48
回答 4查看 5.8K关注 0票数 7

我有三万根弦。我如何将这个切片分离为,例如,从切片中提取3000字符串、从其中提取一些数据并将其推入一个新切片的10个goroutines?

因此,在最后,我将有10个切片,每个3000处理结果。处理这个问题的模式是什么?

我看过这篇文章,但不确定这些模式中哪种模式适用于我的情况。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-03-31 16:15:10

使用通道,从切片中读取元素,使用扇出分配加载和传递消息。然后,在goroutines中处理字符串,并在单个goroutine中收集结果(扇入),以避免互斥。

您可能需要设置最大并发并发goroutines的数目。

记住,当写入片时,它们并不是线程安全的。

有用的信息:

https://blog.golang.org/pipelines https://talks.golang.org/2012/concurrency.slide#1 https://blog.golang.org/advanced-go-concurrency-patterns https://talks.golang.org/2013/advconc.slide#1

票数 3
EN

Stack Overflow用户

发布于 2017-03-31 19:02:45

我同意@JimB关于为什么限制围棋例程的观点。不过,既然这是你的询盘,我也许会这样做..如果你真的想让每个花园会做3000件物品,那么创建一个2d切片就更容易了。[3000项,3000项,.]然后在2d数组中,每个索引有一个go例程处理。否则,下面仅将gorountines限制为10. 方法1包主

代码语言:javascript
复制
import (
    "crypto/rand"
    "fmt"
    "log"
    "sync"
    "time"
)

var s []string

// genetate some mock data
func init() {
    s = make([]string, 30000)
    n := 5
    for i := 0; i < 30000; i++ {
        b := make([]byte, n)
        if _, err := rand.Read(b); err != nil {
            panic(err)
        }
        s[i] = fmt.Sprintf("%X", b)
    }
}

func main() {
    // set the number of workers
    ch := make(chan string)
    var mut sync.Mutex
    counter := 0

    // limit the number of gorountines to 10
    for w := 0; w < 10; w++ {
        go func(ch chan string, mut *sync.Mutex) {
            for {
                // get and update counter using mux to stop race condtions
                mut.Lock()
                i := counter
                counter++
                mut.Unlock()
                // break the loop
                if counter > len(s) {
                    return
                }
                // get string
                myString := s[i]
                // to some work then pass to channel
                ch <- myString

            }
        }(ch, &mut)
    }
    // adding time.  If you play wiht the number of gorountines youll see how changing the number above can efficiency 
    t := time.Now()
    for i := 0; i < len(s); i++ {
        result := <-ch
        log.Println(time.Since(t), result, i)
    }
}

METHOD2 init函数正在创建一个2d数组,分块成10个数组,每个数组包含3000个元素。如果以这种方式解析数据,下面的逻辑只需要很少的修改就可以工作。

代码语言:javascript
复制
package main

import (
    "crypto/rand"
    "fmt"
    "log"
    "sync"
)

var chunkedSlice [10][3000]string

// genetate some mock data
// 2d array, each chunk has 3000 items in it
// there are 10 chunks, 1 go rountine per chunk
func init() {
    n := 5
    for i := 0; i < 10; i++ {
        for j := 0; j < 3000; j++ {
            b := make([]byte, n)
            if _, err := rand.Read(b); err != nil {
                panic(err)
            }
            chunkedSlice[i][j] = fmt.Sprintf("%X", b)
        }
    }
}

func main() {
    // channel to send parsed data to

    ch := make(chan string)
    var wg sync.WaitGroup

    // 10 chunks
    for _, chunk := range chunkedSlice {
        wg.Add(1)
        // if defining the 2d array e.g [10][3000]string, you need to pass it as a pointer to avoid stack error
        go func(ch chan string, wg *sync.WaitGroup, chunk *[3000]string) {
            defer wg.Done()
            for i := 0; i < len(chunk); i++ {
                str := chunk[i]
                // fmt.Println(str)
                // parse the data (emulating)
                parsedData := str
                // send parsed data to the channel
                ch <- parsedData
            }
        }(ch, &wg, &chunk)
    }
    // wait for all the routines to finish and close the channel
    go func() {
        wg.Wait()
        close(ch)
    }()

    var counter int // adding to check that the right number of items was parsed
    // get the data from the channel
    for res := range ch {
        log.Println(res, counter)
        counter++
    }
}
票数 0
EN

Stack Overflow用户

发布于 2021-04-18 08:48:02

我开发了一个库副芹菜来解决这样的任务。只需将切片分割成10部分,并将它们发送到管道,管道将同时处理它们:

代码语言:javascript
复制
import "github.com/nazar256/parapipe"
//...
var longStringSlice []string
// ...
pipeline := parapipe.NewPipeline(10).
    Pipe(func(msg interface{}) interface{} {
    slicePart := msg.([]string)
    // process here to result
    return result
})
// chop the slice and stream parts
chopSize := int(math.Ceil(float64(len(longStringSlice)) / 10))
for i:=0;i<10;i++ {
    firstIdx := i * chopSize
    lastIdx := (i+1) * chopSize
    if lastIdx > len(longStringSlice) {
        lastIdx = len(longStringSlice)
    }
    pipeline.In() <- longStringSlice[firstIdx:lastIdx]
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43143632

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档