# Golang 标准库丨sync
Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.
sync 包提供基本的同步原语,比如互斥锁。除了 Once 和 WaitGroup 类型外,大多数都是用于底层库的。更高级的同步最好通过 channel 通道和 communication 通信来完成。
# 1. WaitGroup
A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.
WaitGroup 等待一组 Goroutine 完成。主 Goroutine 调用 Add 来设置要等待的 Goroutine 的数量。然后每个 Goroutine 运行并在完成时调用 Done。同时,可以使用 Wait 来阻塞,直到所有 Goroutine 完成
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
# 1.1 核心 API
wg.Add(delta int):Add 将 delta(可能为负)添加到 WaitGroup 计数器。如果计数器变为 0,所有在 Wait 时阻塞的 Goroutine 将被释放。如果计数器变成负值,Add 会 panic。
注意:
当计数器为 0 时发生的具有正 delta 的调用必须在等待之前发生。任何时候都可能发生具有负 delta 的调用,或者在计数器大于 0 时开始的具有正 delta 的调用。通常这意味着对 Add 的调用应该在创建 Goroutine 或其他要等待的事件的语句之前执行。如果重用 WaitGroup 来等待多个独立的事件集,则必须在所有先前的 Wait 调用返回后才进行新的 Add 调用。请参阅 WaitGroup 示例。
wg.Done():当 WaitGroup 同步等待组中的某个 Goroutine 执行完毕后,设置这个 WaitGroup 的 counter 数值减 1。
func (wg *WaitGroup) Done() { //其实就是调用了 Add(-1) wg.Add(-1) }
wg.Wait():表示让当前的 Goroutine 等待,进入阻塞状态。一直到 WaitGroup 的计数器为 0,才能解除阻塞,这个 Goroutine 才能继续执行。
# 1.2 官方示例
package main
import (
"fmt"
"sync"
)
type httpPkg struct{}
func (httpPkg) Get(url string) {
fmt.Println(url)
}
var http httpPkg
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"https://hedon954.github.io/noteSite/"
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}
输出:
https://hedon954.github.io/noteSite/
http://www.golang.org/
http://www.google.com/
# 2. Locker
Locker 表示一个对象可以被 lock 和 unlock。
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
# 3. Mutex
Go 语言包中的 sync 包提供了两种锁类型:sync.Mutex 和 sync.RWMutex。
Mutex 是最简单的一种锁类型 —— 互斥锁,同时也比较暴力,当一个 Goroutine 获得了 Mutex 后,其他 Goroutine 就只能乖乖等到这个 Goroutine 释放该 Mutex。
每个资源都对应于一个可称为 “互斥锁” 的标记,这个标记用来保证在任意时刻,只能有一个协程(线程)访问该资源,其它的协程只能等待。
互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库 sync 中的 Mutex 结构体类型表示。sync.Mutex 类型只有两个公开的指针方法:Lock 和 Unlock。
- Lock 锁定当前的共享资源
- Unlock 进行解锁
在使用互斥锁时,一定要注意:
对资源操作完成后,一定要解锁,否则会出现流程执行异常,死锁等问题。通常借助 defer。锁定后,立即使用 defer 语句保证互斥锁及时解锁。
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
# 4. RWMutex
A RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer. The zero value for a RWMutex is an unlocked mutex.
A RWMutex must not be copied after first use.
If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.
RWMutex 是读写锁,该锁可以由任意数量的读者或单个写者持有。RWMutex 的零值是未锁定的互斥锁。
RWMutex 被第一次使用后不得复制。
如果一个 Goroutine 持有一个 RWMutex 的读锁并且另一个 Goroutine 可能会调用 Lock,那么在前面的读锁被释放之前,任何 Goroutine 都不应该期望能够获取读锁。特别是,禁止递归占有读锁,这是为了确保锁最终会被释放;被阻塞的 Lock 调用会将新的读者排除在获取锁之外。
我们怎么理解读写锁呢?当有一个 Goroutine 获得写锁,其它无论是读锁还是写锁都将阻塞直到写锁释放;当有一个 Goroutine 获得读锁,其它 Goroutine 仍然可以申请读锁;当有一个或任意多个读锁,写锁将等待所有读锁解锁之后才能够进行写锁。所以说这里的读锁(RLock)目的其实是告诉写锁:有很多人正在读取数据,你给我站一边去,等它们读完你再来写。我们可以将其总结为如下三条:
- 同时只能有一个 Goroutine 能够获得写锁
- 同时可以有任意多个 Gorouinte 获得读锁
- 同时只能存在写锁或读锁(读和写互斥)
所以,RWMutex 这个读写锁,该锁可以加多个读锁或者一个写锁,其经常用于读远远多于写的场景。
读写锁的写锁只能锁定一次,解锁前不能多次锁定,读锁可以多次,但读解锁次数最多只能比读锁次数多一次,一般情况下我们不建议读解锁次数多余读锁次数。
基本遵循两大原则:
- 可以随便读,多个 Goroutine 同时读
- 写的时候,啥也不能干,不能读也不能写
读写锁即是针对于读写操作的互斥锁。它与普通的互斥锁最大的不同就是:它可以分别针对读操作和写操作进行锁定和解锁操作。读写锁遵循的访问控制规则与互斥锁有所不同。在读写锁管辖的范围内,它允许任意个读操作的同时进行。但是在同一时刻,它只允许有一个写操作在进行。
# 4.1 核心 API
- rwm.RLock():上读锁
- rwm.RUnlock():解读锁
- rwm.Lock():上写锁
- rwm.Unlock():解读锁
# 4.2 最佳实践
package main
import (
"fmt"
"sync"
"time"
)
var rwLock sync.RWMutex
var waitGroup sync.WaitGroup
func main() {
rwLock = sync.RWMutex{}
waitGroup = sync.WaitGroup{}
m := make(map[string]string)
m["a"] = "hello"
waitGroup.Add(7)
go read(m, 1)
go read(m, 2)
go read(m, 3)
time.Sleep( 1 * time.Second)
go write(m)
go read(m, 4)
go read(m, 5)
go read(m, 6)
waitGroup.Wait()
}
func read(m map[string]string, i int) {
rwLock.RLock()
fmt.Println("上了第", i, "个读锁")
fmt.Println(m["a"])
time.Sleep( 3 * time.Second)
rwLock.RUnlock()
fmt.Println("释放了第", i, "个读锁")
waitGroup.Done()
}
func write(m map[string]string) {
rwLock.Lock()
fmt.Println("上了写锁")
m["a"] = "hedon"
rwLock.Unlock()
fmt.Println("释放了写锁")
waitGroup.Done()
}
输出:
上了第 1 个读锁
hello
上了第 3 个读锁
hello
上了第 2 个读锁
hello
上了第 6 个读锁
hello
上了第 4 个读锁
hello
释放了第 1 个读锁
释放了第 2 个读锁
释放了第 3 个读锁
释放了第 4 个读锁
释放了第 6 个读锁
上了写锁
释放了写锁
上了第 5 个读锁
hedon
释放了第 5 个读锁
# 5. Map
Map is like a Go map[interface{}]interface{} but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time.
The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.
The zero Map is empty and ready for use. A Map must not be copied after first use.
Map 类似于 Go 的
map[interface{}]interface{}
,但对于多个 Goroutine 并发使用是安全的,无需额外的锁定或协调。Load、Store 和 Delete 在分摊常数时间内运行。Map 类型是专门的。大多数代码应该使用带有单独锁定或协调的普通 Go map,以获得更好的类型安全性并更容易维护其他不变量以及映射内容。
Map 类型针对两种常见用例进行了优化:
- 当给定 key 的条目只写入一次但读取多次时,如在只会增长的缓存中
- 当多个 Goroutine 读取、写入和覆盖不相交的 key 集的条目
在这两种情况下,与使用单独的 Mutex 或 RWMutex 配对的 Go map 相比,使用 Map 可以显着减少锁争用。
零映射是空的并且可以使用了,第一次使用后不得复制 Map。
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
类似于 Java 的 ConcurrentMap。
# 5.1 核心 API
- m.Load(k) (v, ok):并发安全的 get。
- m.Store(k, v):并发安全的 put。
- m.Delete(k):并发安全的 delete。
- m.LoadAndDelete(k) (v, loaded):LoadAndDelete 删除键的值,如果之前有值,则返回之前的值,loaded 表示是否有之前的值。
- m.LoadAndStore(k, v) (actual, loaded):LoadOrStore 返回键的现有值(如果存在)。否则,它存储并返回给定的值。loaded 为 true 表示返回现有值,loaded 为 false 表示存储给定值。
- m.Range(func(k, v) bool):按照传入的 func 规则遍历 Map。
# 5.2 最佳实践
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Map{}
go func() {
m.Store(1,2)
fmt.Println(m.Load(1))
}()
go func() {
m.Store(1,2)
fmt.Println(m.Load(1))
}()
fmt.Println(m.Load(1))
time.Sleep(5 * time.Second)
}
检查数据冲突:
go run -race concurrentMap.go
输出:
<nil> false
2 true
2 true
没有并发冲突。
# 6. Once
Once is an object that will perform exactly one action.
A Once must not be copied after first use.
Once 是只执行一次的对象。
Once 使用后不能复制。
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/386),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}
# 6.1 核心 API
once.Do(func):执行某一函数
func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { // Outlined slow-path to allow inlining of the fast-path. o.doSlow(f) } }
# 6.2 官方示例
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only Once")
}
doneChan := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
doneChan <- true
}()
}
for i := 0; i < 10; i++ {
<- doneChan
}
}
输出:只执行了一次
Only Once
# 7. Pool
A Pool is a set of temporary objects that may be individually saved and retrieved.
Any item stored in the Pool may be removed automatically at any time without notification. If the Pool holds the only reference when this happens, the item might be deallocated.
A Pool is safe for use by multiple goroutines simultaneously.
Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.
An appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to amortize allocation overhead across many clients.
An example of good use of a Pool is in the fmt package, which maintains a dynamically-sized store of temporary output buffers. The store scales under load (when many goroutines are actively printing) and shrinks when quiescent.
On the other hand, a free list maintained as part of a short-lived object is not a suitable use for a Pool, since the overhead does not amortize well in that scenario. It is more efficient to have such objects implement their own free list.
A Pool must not be copied after first use.
Pool 一个临时对象池。
存储在 Pool 中的任何项目都可以在不通知的情况下自动删除。如果在发生这种情况的时候 Pool 持有唯一的引用,则该项可能被释放。
一个 Pool 可以被多个 Goroutine 同时使用。
Pool 的目的是缓存已分配但未使用的项,以便以后重用,减轻垃圾收集器的压力。也就是说,它使构建高效、线程安全的空闲列表变得很容易。但是,它并不适合所有的空闲列表。
Pool 的适当用途是管理一组临时项,这些临时项在包的并发独立客户端之间默默地共享,并可能被重用。Pol 提供了一种跨多个客户端分摊分配开销的方法。
一个很好的使用 Pool 的例子是 fmt 包,它维护一个动态大小的临时输出缓冲区存储。这个存储在负载下伸缩(当许多goroutine正在积极打印时),在静止时收缩。
另一方面,作为短期对象的一部分维护的空闲列表不适合使用 Pool,因为在这种情况下开销不能很好地分摊。让这些对象实现它们自己的自由列表会更有效。
Pool 在第一次使用后不能被复制。
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
# 7.1 核心 API
pool.Get():
Pool
会为每个P
维护一个本地池,P
的本地池分为 私有池private
和共享池shared
。私有池中的元素只能本地P
使用,共享池中的元素可能会被其他P
偷走,所以使用私有池private
时不用加锁,而使用共享池shared
时需加锁。Get
会优先查找本地private
,再查找本地shared
,最后查找其他P
的shared
,如果以上全部没有可用元素,最后会调用New
函数获取新元素。pool.Put(p):
Put
优先把元素放在private
池中;如果private
不为空,则放在shared
池中。有趣的是,在入池之前,该元素有 1/4 可能被丢掉。
# 7.2 官方示例
package main
import (
"bytes"
"io"
"os"
"sync"
"time"
)
var bufPool = sync.Pool{
New: func() interface{} {
// The Pool's New function should generally only return pointer
// types, since a pointer can be put into the return interface
// value without an allocation:
// 总结:指针
return new(bytes.Buffer)
},
}
// timeNow is a fake version of time.Now for tests.
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {
// 从 Pool 池中随便取一个 byte.Buffer 对象,注意,是指针
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
// Replace this with time.Now() in a real logger.
// 用 time.Now() 替换真实日志
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
// 输出 b 的内容
_, err := w.Write(b.Bytes())
if err != nil {
panic(err)
}
// 再把 b 丢回 Pool
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "./search?q=flowers")
}
输出:
2006-01-02T15:04:05Z path=./search?q=flowers
# 8. Cond
Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.
Each Cond has an associated Locker L (often a *Mutex or *RWMutex), which must be held when changing the condition and when calling the Wait method.
A Cond must not be copied after first use.
Cond 实现了一个条件变量,它是 Goroutine 等待或宣布事件发生的集合点。
每个 Cond 都有一个关联的 Locker(通常是 *Mutex 或 *RWMutex),当改变条件和调用 Wait 方法时,必须持有它。
Cond 在第一次使用后不能被复制。
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
# 8.1 核心 API
- NewCond(locker):使用一个 Locker 来创建一个 Cond。
- cond.Broadcast():广播,唤醒所有 Wait 了的 Goroutine,若没有 Wait 的,也不会报错。
- cond.Signal():唤醒一个 Wait 了的 Goroutine,若没有 Wait 的,也不会报错。Signal 通知的顺序是根据原来 Wait 的顺序,先入先出。
- cond.Wait():Unlock → 阻塞等待通知(即等待 Signal 或 Broadcast 的通知)→ 收到通知 → Lock
类似于 Java 当中的 object.wait()、object.notify() 和 object.notifyAll()。
# 8.2 最佳实践
package main
import (
"fmt"
"sync"
)
func main() {
cond := sync.NewCond(new(sync.Mutex))
condition := 0
// Consumer
go func() {
for {
// 占有锁
cond.L.Lock()
// 没有可以消费的,阻塞
for condition == 0 {
cond.Wait()
}
fmt.Printf("Consumer consumes: %d\n", condition)
condition --
// 消费了,必然不满,通知生产者可以生产
cond.Signal()
// 释放锁
cond.L.Unlock()
}
}()
// Producer
for {
// 占有锁
cond.L.Lock()
// 慢了就不生产了,阻塞
if condition == 3 {
cond.Wait()
}
condition ++
fmt.Printf("Producer produces: %d\n", condition)
// 生产了,必然有可以消费的,唤醒消费者
cond.Signal()
// 释放锁
cond.L.Unlock()
}
}
输出:
Producer produces: 1
Producer produces: 2
Producer produces: 3
Consumer consumes: 3
Consumer consumes: 2
Consumer consumes: 1
...