我希望使用go例程的for循环是并行的。我试着用频道,但那不管用。我的主要问题是,在继续之前,我希望等待所有迭代完成。这就是为什么在go不能工作之前简单地编写它。我试图使用通道(我认为是错误的),但这使得我的代码更慢。
func createPopulation(populationSize int, individualSize int) []Individual {
population := make([]Individual, populationSize)
//i want this loop to be work parallel
for i := 0; i < len(population); i++ {
population[i] = createIndividual(individualSize)
}
return population
}
func createIndividual(size int) Individual {
var individual = Individual{make([]bool, size), 0}
for i := 0; i < len(individual.gene); i++ {
if rand.Intn(2)%2 == 1 {
individual.gene[i] = true
} else {
individual.gene[i] = false
}
}
return individual
}我的结构看起来如下:
type Individual struct {
gene []bool
fitness int
}发布于 2014-06-16 08:22:21
因此,基本上,goroutine不应该返回一个值,而应该将它推入一个通道。如果你想等待所有的猩猩完成,你只需数到峡谷的数量,或者使用WaitGroup。在本例中,这是一个过分的做法,因为它的大小是已知的,但无论如何,它是一个很好的实践。下面是一个修改过的示例:
package main
import (
"math/rand"
"sync"
)
type Individual struct {
gene []bool
fitness int
}
func createPopulation(populationSize int, individualSize int) []Individual {
// we create a slice with a capacity of populationSize but 0 size
// so we'll avoid extra unneeded allocations
population := make([]Individual, 0, populationSize)
// we create a buffered channel so writing to it won't block while we wait for the waitgroup to finish
ch := make(chan Individual, populationSize)
// we create a waitgroup - basically block until N tasks say they are done
wg := sync.WaitGroup{}
for i := 0; i < populationSize; i++ {
//we add 1 to the wait group - each worker will decrease it back
wg.Add(1)
//now we spawn a goroutine
go createIndividual(individualSize, ch, &wg)
}
// now we wait for everyone to finish - again, not a must.
// you can just receive from the channel N times, and use a timeout or something for safety
wg.Wait()
// we need to close the channel or the following loop will get stuck
close(ch)
// we iterate over the closed channel and receive all data from it
for individual := range ch {
population = append(population, individual)
}
return population
}
func createIndividual(size int, ch chan Individual, wg *sync.WaitGroup) {
var individual = Individual{make([]bool, size), 0}
for i := 0; i < len(individual.gene); i++ {
if rand.Intn(2)%2 == 1 {
individual.gene[i] = true
} else {
individual.gene[i] = false
}
}
// push the population object down the channel
ch <- individual
// let the wait group know we finished
wg.Done()
}发布于 2014-06-16 10:19:00
将受控并行性添加到这样的循环中的一个常见方法是生成许多将从通道读取任务的工作数据。runtime.NumCPU函数可能有助于确定生成多少工作人员是有意义的(确保您适当地设置了GOMAXPROCS以利用这些CPU)。然后,您只需将作业写入通道,它们将由工人处理。
在这种情况下,任务是初始化人口块的元素,因此使用*Individual指针的通道可能是有意义的。就像这样:
ch := make(chan *Individual)
for i := 0; i < nworkers; i++ {
go initIndividuals(individualSize, ch)
}
population := make([]Individual, populationSize)
for i := 0; i < len(population); i++ {
ch <- &population[i]
}
close(ch)工人的戈鲁廷看起来会是这样的:
func initIndividuals(size int, ch <-chan *Individual) {
for individual := range ch {
// Or alternatively inline the createIndividual() code here if it is the only call
*individual = createIndividual(size)
}
}由于任务没有提前分割,所以createIndividual是否需要可变的时间并不重要:每个工作人员只在最后一个任务完成时承担一个新任务,并且在没有任务时退出(因为在那个时候通道已经关闭)。
但我们如何知道这项工作何时完成?sync.WaitGroup类型可以在这里提供帮助。可以对生成工作人员goroutines的代码进行如下修改:
ch := make(chan *Individual)
var wg sync.WaitGroup
wg.Add(nworkers)
for i := 0; i < nworkers; i++ {
go initIndividuals(individualSize, ch, &wg)
}initIndividuals函数也被修改为接受附加参数,并添加defer wg.Done()作为第一个语句。现在,对wg.Wait()的调用将被阻塞,直到所有的工作人员已经完成。然后,您可以返回完全构造的population切片。
发布于 2017-06-25 15:25:59
如果您想避免将并发逻辑与业务逻辑混为一谈,我编写了这个库https://github.com/shomali11/parallelizer来帮助您。它封装了并发逻辑,因此您不必担心它。
所以在你的例子中:
package main
import (
"github.com/shomali11/parallelizer"
"fmt"
)
func main() {
populationSize := 100
results = make([]*Individual, populationSize)
options := &Options{ Timeout: time.Second }
group := parallelizer.NewGroup(options)
for i := 0; i < populationSize; i++ {
group.Add(func(index int, results *[]*Individual) {
return func () {
...
results[index] = &Individual{...}
}
}(i, &results))
}
err := group.Run()
fmt.Println("Done")
fmt.Println(fmt.Sprintf("Results: %v", results))
fmt.Printf("Error: %v", err) // nil if it completed, err if timed out
}https://stackoverflow.com/questions/24238820
复制相似问题