
此前我对于原子操作用的不是很多。按照之前看来的经验总结,Go中写高并发程序一般还是从逻辑上来避免加锁,毕竟原子操作写起来难度很大,而且不实际测试一下很容易写错。 不过如果能用上原子操作肯定是最好的。昨天工作的时候正好碰上了一个能用到CAS的使用场景,以此为契机学习并使用Go中的CAS。
CAS(Compare And Swap)是最基础的原子操作之一,当年上操作系统就有提过。 遇到的这个问题其实本来是不用CAS的。 具体来说,有多个goroutine会在循环中被逐个启动,每个goroutine都可能会返回一个error。如果这些goroutine中的error至少有一个非空,则需要退出返回这个error并重新执行。 该代码原来的写法存在bug,在昨天写新代码的时候想到了这篇文章。
如下的写法是最容易想到的。其实这么写也是符合业务规范的(也被用了进去),只是这样的写法会造成数据竞争,最终的error值会为所有通过if判定条件中最后一个修改err的goroutine所对应的值。 问题在于,如果我希望获取到第一个产生的非空的error的值,应该怎么做?显然,此时程序不能发生数据竞争。
func test1() {
// 存在数据竞争
const num = 10
var err error = nil
wg := &sync.WaitGroup{}
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 该goroutine存在数据竞争,读写冲突(读前写)
defer wg.Done()
innerErr := getErr()
if innerErr != nil && err == nil { // 读的位置
fmt.Println("err:", innerErr)
err = innerErr // 竞争位置,之前写的地方
}
}()
}
wg.Wait()
fmt.Println(err)
}如果error为基本类型的话,这就是一个经典的CAS场景。Go中提供了unsafe.Pointer的CAS,因此可以用以下代码。 不过这么写的问题在于unsafe.Pointer的转换是存在一定的代价的,而且这么写感觉很奇怪。
func test2() {
// 直接CAS
const num = 10
var err error = nil
errPointer := unsafe.Pointer(&err)
wg := &sync.WaitGroup{}
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 没有数据竞争
defer wg.Done()
innerErr := getErr()
if innerErr != nil {
fmt.Println("err:", innerErr)
innerErrPointer := unsafe.Pointer(&innerErr)
atomic.CompareAndSwapPointer(&errPointer, unsafe.Pointer(&err), innerErrPointer)
}
}()
}
wg.Wait()
errResult := (*error)(errPointer)
fmt.Println(*errResult)
}Go的CAS操作函数是有返回值的,如atomic pkg所示,CAS操作原子等价于
if *addr == old {
*addr = new
return true
}
return false这也是函数声明中的swapped返回值func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)。
因此,可以借助返回值来实现类似的逻辑。该逻辑使用一个标记位来实现对err访问的原子并发控制。
func test3() {
// 不存在数据竞争
const num = 10
var isErrSet uint32 = 0
var err error = nil
wg := &sync.WaitGroup{}
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 没有数据竞争
defer wg.Done()
innerErr := getErr()
if innerErr != nil { // 看上去像是可以写上 isErrSet == 0,但如果加上的话就会发生数据竞争
fmt.Println("err:", innerErr)
if atomic.CompareAndSwapUint32(&isErrSet, 0, 1) {
err = innerErr // 可能得竞争位置,previous write的地方
}
}
}()
}
wg.Wait()
fmt.Println(err)
}这个地方CAS倒也并不是最佳的使用范例,只是可以用而已。毕竟CAS真的挺容易写错的……
一个是可以选择使用sync.Once,这个函数可以保证once.Do中的函数只执行一次。
func test4() {
// 不存在数据竞争
const num = 10
once := new(sync.Once)
var err error = nil
wg := new(sync.WaitGroup)
for i := 0; i < num; i++ {
wg.Add(1)
go func() { // 没有数据竞争
defer wg.Done()
innerErr := getErr()
if innerErr != nil { // 尽管看上去很像是要写 isErrSet == 0,但实际上不应该写。如果加上的话就会发生数据竞争
fmt.Println("err:", innerErr)
once.Do(func() {
err = innerErr
})
}
}()
}
wg.Wait()
fmt.Println(err)
}另一个可选择的方法是使用errgroup。该方法的问题是操作性会比较低,对于EOF等非nil但是又可能是正常的错误可能会造成非预期的结果,把真正需要的err给漏掉。届时可能还是需要自行实现。
func test5() {
// 不存在数据竞争
const num = 10
g, _ := errgroup.WithContext(context.Background())
var err error = nil
for i := 0; i < num; i++ {
g.Go(func() error {
innerErr := getErr()
if innerErr != nil {
fmt.Println("err:", innerErr)
}
return innerErr
})
}
err = g.Wait()
fmt.Println(err)
}参考此文的解析,CAS操作是Go汇编中两个命令通过加锁实现的。 CPU有对应的CAS指令,不过看Go内使用了Lock的汇编命令。后续需要进一步学习。
当多个线程同时使用CAS操作一个变量时,只会有一个胜出。如果是互斥锁,则失败线程会休眠。而CAS操作下线程仅会被告知失败,并会不断自旋(忙等待)。
CAS底层原理的演进见此文,此处不再赘述。
我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=31u7nlc3nvuo0