所谓的原子操作就是并发编程中“最小的且不可并行化”的操作。通常,如果多个并发体对同一个共享资源进行的操作是原子的话,那么同一时刻最多只能有一个并发体对该资源进行操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。
一般情况下,原子操作都是通过“互斥”访问来保证的,通常由特殊的 CPU 指令提供保护。当然,如果仅仅是想模拟下粗粒度的原子操作,我们可以借助于 sync.Mutex 来实现:
import (
"sync"
)
var total struct {
sync.Mutex
value int
}
func worker(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i <= 100; i++ {
total.Lock()
total.value += i
total.Unlock()
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
fmt.Println(total.value)
}
用互斥锁来保护一个数值型的共享资源,麻烦且效率低下。标准库的 sync/atomic 包对原子操作提供了丰富的支持。我们可以重新实现上面的例子:
import (
"sync"
"sync/atomic"
)
var total uint64
func worker(wg *sync.WaitGroup) {
defer wg.Done()
var i uint64
for i = 0; i <= 100; i++ {
atomic.AddUint64(&total, i)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
}
atomic.AddUint64 函数调用保证了 total 的读取、更新和保存是一个原子操作,因此在多线程中访问也是安全的。sync/atomic 包对基本的数值类型及复杂对象的读写都提供了原子操作的支持。atomic.Value 原子对象提供了 Load 和 Store 两个原子方法,分别用于加载和保存数据,返回值和参数都是 interface{}类型,因此可以用于任意的自定义复杂类型。
var config atomic.Value // 保存当前配置信息
// 初始化配置信息
config.Store(loadConfig())
// 启动一个后台线程, 加载更新后的配置信息
go func() {
for {
time.Sleep(time.Second)
config.Store(loadConfig())
}
}()
// 用于处理请求的工作者线程始终采用最新的配置信息
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// ...
}
}()
}
这是一个简化的生产者消费者模型:后台线程生成最新的配置信息;前台多个工作者线程获取最新的配置信息。所有线程共享配置信息资源。
原子操作配合互斥锁可以实现非常高效的单件模式。互斥锁的代价比普通整数的原子读写高很多,在性能敏感的地方可以增加一个数字型的标志位,通过原子检测标志位状态降低互斥锁的使用次数来提高性能。
type singleton struct {}
var (
instance *singleton
initialized uint32
mu sync.Mutex
)
func Instance() *singleton {
if atomic.LoadUint32(&initialized) == 1 {
return instance
}
mu.Lock()
defer mu.Unlock()
if instance == nil {
defer atomic.StoreUint32(&initialized, 1)
instance = &singleton{}
}
return instance
}
我们可以将通用的代码提取出来,就成了标准库中 sync.Once 的实现:
type Once struct {
m Mutex
done uint32
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
基于 sync.Once 重新实现单例模式:
var (
instance *singleton
once sync.Once
)
func Instance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}