协程 Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。 协程池 高并发场景下,会启动大量协程进行业务处理,此时如果使用协程池可以复用对象,减少协程池内存分配的效率与创建协程池点创建开销,提高协程的执行效率。 字节官方开源了gopkg库提供的 gopool 协程池实现。 协程池实现原理 线程池设计 type pool struct { // pool 的名字,打 metrics 和打 log 时用到 name string // pool 的容量,也就是最大的真正在工作的 goroutine需不需要加的问题; // 以及协程池是不是啥都没有的问题 // 满足以下两个条件: // 1. task 数量大于阈值 // 2.
go实现协程池,协程轻量但并不是越多越好。 使用协程池可用对资源进行有效控制。 在内存资源够用的情况,或者其他不用限制同时任务数的情况,请用原生go 协程,不必使用协程池 协程池的数量和CPU核数的关系 小于或者等于CPU核数: 适用于计算密集型的任务中,如果协程的执行时间较长且没有 IO操作,可以将协程池的数量设置为小于CPU核数的值。 这样做可以避免过多的协程竞争CPU资源,减少上下文切换的开销,如图像处理、数据分析等。 大于CPU核数: 如果任务需要进行大量的IO操作,可以考虑将协程池的数量设置为大于CPU核数的值。
通过无缓冲的通道实现Worker池,无缓冲的通道好处是:1. 任务不会丢失,所有投递的任务都一定会被处理,如果协程池里的协程都在忙碌中的话,那么会阻塞在往通道投递任务的那一行代码。2. 调用者可以及时的知道协程池是否处于忙碌的状态中。 type Pool struct {work chan Workerwg sync.WaitGroup}// New创建一个新协程池func New(maxGoroutines int) *Pool { waitgroup中关闭defer p.wg.Done()for w := range p.work {// 阻塞等待执行任务w.Task()}}()}return &p}// Run提交工作到协程池func :package mainimport ("GoPratice/work""log""sync""time")// 通过main.go调用work包中的协程池// 这个示例程序展示如何使用work包//
相比线程,协程占据更小的内存空间,并且由于是在用户态进行调度,上下文切换的代价更小。所以协程更加容易支撑几万几百万的并发。 因此本文的目的是学习如何实现一个go协程池。 借鉴java的线程池,定义如下的结构体 type GoroutinePool struct { name string coreSize uint32 //定义有多少协程 taskChan chan func() //类似java的Runable中的run方法 stop bool //是否停止协程池 } 新建一个协程池,通过start方法启动协程。 但是本文实现的协程池还缺少了: 1、协程池大小的动态扩展能力;例如java支持coreSzie和maxSize,允许一定的突发。 2、拒绝策略。
文章目录 1.何为并发 2.并发的好处 3.Go 如何并发 4.G-P-M 调度模型 5.Go 程的代价 6.协程池的作用 7.简易协程池的设计&实现 8.开源协程池的使用 9.小结 参考文献 1.何为并发 使用协程池限制 Go 程的开辟个数在大型并发场景是有必要的,这也是性能优化方法中对象复用思想的一个具体应用。 7.简易协程池的设计&实现 一个简单的协程池可以这么设计。 总地来说上面简易协程池的不足: (1)无法知道 worker 与 pool 的状态; (2)worker 数量不足无法动态扩增; (3)worker 数量过多无法自动缩减。 8.开源协程池的使用 一个成熟的协程池应该具有如下能力: (1)worker & pool 状态控制; 性能测试、任务超时等都需要知道和控制任务与 Go 程池的状态。 目前有很多第三方库实现了协程池,可以很方便地用来控制协程的并发数量,比较受欢迎的有: (1)Jeffail/tunny (2)panjf2000/ants 下面以 panjf2000/ants 为例
之前已经使用了Java实现,最近在计划使用Go语言实现一些新的压测功能的开发,这其中肯定也少不了使用到线程池(Go中协程池)。 协程池属性设计 我从Java抄来两个属性:核心数,最大数。其中核心数在协程池自己管理中收到最大值的限制,在使用API时不受限制。 我增加了活跃协程数(这个在java.util.concurrent.ThreadPoolExecutor也有,但未显式展示),协程池状态(防止main结束导致进程直接结束)。 p.status { break } ftool.Sleep(1000) p.balance() } }() return p } 管理协程数 主要分成2个:增加和减少 1了,协程池自增策略生效了。
2. 再来看看协程的启动 说了这么多线程,原因嘛,毕竟大家对它是最熟悉的。 JVM 上默认调度器的实现也许你已经猜到,没错,就是开了一个线程池,但区区几个线程足以调度成千上万个协程,而且每一个协程都有自己的调用栈,这与纯粹的开线程池去执行异步任务有本质的区别。 } job.cancel() log(3) 我们创建了协程后立即 cancel,但由于是 ATOMIC 模式,因此协程一定会被调度,因此 1、2、3 一定都会输出,只是 2 和 3 的顺序就难说了。 delay(1000) log(3) } job.cancel() log(4) job.join() 我们在 2 和 3 之间加了一个 delay, delay 会使得协程体的执行被挂起 2 会连续在同一线程中执行, delay 是挂起点,因此 3 会等 100ms 后再次调度,这时候 4 执行, join 要求等待协程执行完,因此等 3 输出后再执行 5。
进程池与线程池是用来控制当前程序允许创建(进程/线程)的数量。 2)进程池与线程池的作用: 保证在硬件允许的范围内创建(进程/线程)的数量。 对比一下: 进程:资源单位 线程:执行单位 协程:在单线程下实现并发 注意:协程不是操作系统资源,它是程序员起的名字,目的是为让单线程能实现并发。 协程的目的:通过手动模拟操作系统“多道技术”,实现切换+保存状态。 如何实现协程? ) def play(name): print('%s play 1'%name) gevent.sleep(1) print('%s play 2'%name) #创建一个协程对象
摘要: 进程池与线程池 同步调用和异步调用 回调函数 协程 一、进程池与线程池: 1、池的概念: 不管是线程还是进程,都不能无限制的开下去,总会消耗和占用资源。 2、进程池与线程池的使用方法:(进程与线程的创建基本相似,所以进程池与线程池的使用过程也基本一样) from concurrent.futures import ProcessPoolExecutor (通过单线程实现并发) 我们知道,多个线程执行任务时候,如果其中一个任务遇到IO,操作系统会有一种来回'切'的机制,来最大效率利用cpu的使用效率,从而实现多线程并发效果 而协程:就是用单线程实现并发, 这两者肯定都会有IO,如果能够实现通信io了我就去干建连接,建连接io了我就去干通信,那其实我们就可以实现单线程下实现并发 将单个线程的效率提升到最高,多进程下开多线程,多线程下用协程>>> 实现高并发 协程实现服务端客户端通信 # 服务端: from gevent import monkey;monkey.patch_all() from gevent import spawn import socket
字节开源Go协程池gopool Java 中线程池,也支持自定义线程池,为啥 Golang 官方没有提供协程池的实现?Golang 官方偏向轻量级的并发, 希望通过 go func() 解决问题。 初始栈内存为 2KB,如果新建过多协程,过多 goruntine,内存会达到G级别,如何让协程数可控,是一个问题。 协程泄漏问题,如果协程的bug,导致协程无法被回收,日积月累,可能导致程序崩溃,需要有工具避免协程泄漏问题。 先写一个协程池 一般来说,用 waitGroup 结合 channel ,可以实现一个协程池的功能。 一个协程池,一般要具有如下三个功能: 提交任务 启动协程 等待协程执行结束 package main import ( "fmt" "sync" "testing" ) //
golang开源的协程池项目:github.com/gammazero/workerpool下面是一段使用 demopackage main import ( "fmt" "github.com /gammazero/workerpool" "time") func main() { wp := workerpool.New(2) requests := []string{"alpha for _, r := range requests { //r := wp.Submit(func() { time.Sleep(time.Second * 2)
协程池的优势协程池通过限制并发任务的数量,可以有效控制资源使用,提升系统性能,主要优势包括:资源管理:通过限制goroutine的数量,避免资源过度消耗。 协程池的实现一个简单的协程池需要以下几个部分:任务队列:存放待执行的任务。工人(worker)池:负责执行任务的goroutine集合。调度器:管理任务队列和工人池之间的交互。 分布式协程池在大规模分布式系统中,可以将协程池扩展到多台机器,通过分布式消息队列协调任务调度,实现高可用、高性能的分布式任务处理。 通过引入分布式协程池,可以将任务分发到多台机器上进行处理,提高系统的处理能力和可用性。分布式任务队列是实现分布式协程池的关键。它负责将任务分发到不同的机器上,并收集处理结果。 每个节点运行一个本地协程池,并从分布式任务队列中获取任务进行处理。通过负载均衡算法,确保任务均匀分布到各个节点上,提高系统整体性能。
协程池是一种常见的并发编程模式,它可以在多个协程之间共享一组固定数量的协程,以避免创建过多的协程导致系统资源耗尽。 在 Go 语言中,协程池通常使用 sync.WaitGroup 和 chan 类型来实现。 在本文中,我们将介绍一种用户设计模式,即封装协程池。 该模式可以将协程池的实现细节隐藏在一个简单的接口后面,使用户可以轻松地使用协程池而不必了解其内部实现。 实现协程池 首先,我们定义一个 workerFunc 类型,它表示一个可以在协程池中运行的函数。 workers 通道用于存储要运行的函数,limit 通道用于限制协程池中的协程数量。 在函数运行完成后,我们调用 Stop 方法来关闭协程池。 通过封装协程池,我们可以将协程池的实现细节隐藏在一个简单的接口后面,使用户可以轻松地使用协程池而不必了解其内部实现。
fasthttp中的协程池实现 协程池可以控制并行度,复用协程。fasthttp 比 net/http 效率高很多倍的重要原因,就是利用了协程池。 goroutine status: main0: wp.Start() g1: for loop to clean idle workerChan g2: wp.workerFunc(ch) blocks 在 g1 协程中运行着呢。 所以最后的运行状态是: goroutine status: main0: wp.Start() g1: for loop to clean idle workerChan g2: wp.workerFunc 本文来自:Segmentfault 感谢作者:一堆好人卡 查看原文:fasthttp中的协程池实现
控制使用资源并不是协程池目的,使用协程池是为了更好并发、程序鲁棒性、容错性等。废话少说,快速入门协程池才是这篇文章的目的。 简单协程池模型 上面这个图展示了最简单的协程池的样子。 先把协程池作为一个整体看,它使用2个通道,左边的jobCh是任务通道,任务会从这个通道中流进来,右边的retCh是结果通道,协程池处理任务后得到的结果会写入这个通道。 示例代码2 main()启动genJob获取存放任务的通道jobCh,然后创建retCh,它的缓存空间是200,并使用workerPool启动一个有5个协程的协程池。 示例运行结果如下,一共产生了10个任务,显示大部分工作都被worker 2这个协程抢走了,如果我们设置的任务成千上万,协程池长时间处理任务,每个协程处理的工作数量就会均衡很多。
errors.New("input thread nums more than Max nums")// HandleEvent 处理事件type HandleEvent func()// ThreadInfo 协程结构定义 type ThreadInfo struct {f HandleEventsem chan intno int}// ThreadPool 协程池定义type ThreadPool struct {threadnums int // 携程数量threadChans chan int // 协程控制器mx sync.Mutex // 协程保护锁threadInfos []*ThreadInfo // 携程信息queuesmx sync.Mutex // 事件队列锁queues isover bool // 协程池是否结束}// CreateThreadPoolfunc CreateThreadPool(threadnums int)
协程连接池 连接池这个东西即使没用过,你也应该听说过,特别是做过 Java 等其它语言开发的同学,对这玩意绝对不会陌生。今天,我们就来讲讲 Swoole 中如何应用连接池。 Swoole 中的连接池,是基于协程的,并且也是通过 Channel 自动调度的,你不用管太多别的,只管用就是了。 连接池对象准备好之后,创建 4 个协程,在这些协程中使用连接池去请求 MySQL 查询,当然,并没有查询什么真的表,只是做一个简单的计算操作,如果执行或计算失败,会抛出异常。 // 1024 2 0.23715400695801 // 1024 10 0.13657021522522 1024 个协程,2 个连接的连接池执行的结果是 0.237 秒。 然后你也可以自己再调大创建的协程数量以及调整连接池数据进行测试。 连接池设置多大 连接池的数量可不是随便设置的,第一点,你不能超过对方系统所支持的连接数量。
# 一个简单的小爬虫,将3个页面的数据保存到data.html,对比协程和非协程的使用时间 """协程 1、通过urlopen获取数据 2、写入文件 3、使用三个页面,通过gevent.joinal执行 (协程会在IO阻塞处切换),用时短 4、在Windows系统,由于捕获IO较慢。
比如批量群发邮件的功能 因为发送邮件是个比较耗时的操作, 如果是传统的一个个执行 , 总体耗时比较长 可以使用golang实现一个协程池 , 并行发送邮件 pool包下的pool.go文件 package "log" //具体任务,可以传参可以自定义操作 type Task struct { Args interface{} Do func(interface{})error } //协程的个数 i) } for task:=range Jobs{ JobChannels<-task } close(JobChannels) } //实际的工作协程worker page:=1; for{ //模拟每页获取的邮箱 emails:=[]string{ "1@qq.com", "2@ \n",page) page++ } } func TestPool(t *testing.T) { //定义5个协程 pool.Nums = 5 //开个子协程去不停的获取邮箱
遇到阻塞就切换到另一个协程继续执行 ! gevent 协程通信 ? # 将函数封装成协程,并开始调度 >>>producer = gevent.spawn(producer, queue) # 阻塞(一阻塞就切换协程)等待 >>>gevent.joinall([producer gevent通信 问题引入 问题一: 协程之间不是能通过switch通信嘛? >>>是的,由于 gevent 基于 greenlet,所以可以。 问题二: 那为什么还要考虑通信问题?