# Golang 并发编程

  • 什么是并发编程
  • 什么是并行,并发,串行
  • Go 语言如何实现并发编程,以及实现的原理
  • goroutine 的使用
  • runtime 包
  • sync 包的介绍
  • channel 通道的使用,以及缓冲通道,定向通道
  • select 语句
  • time 包中和并发编程相关的函数介绍
  • CSP 模型

# 一、并发性 Concurrency

# 1. 多任务

怎么来理解多任务呢?其实就是指我们的操作系统可以同时执行多个任务。举个例子,你一边听音乐,一边刷微博,一边聊 QQ,一边用Markdown 写作业,这就是多任务,至少同时有4个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是界面上没有显示而已。

img

# 1.2 并发

Go 是并发语言,而不是并行语言。在讨论如何在 Go 中进行并发处理之前,我们首先必须了解什么是并发,以及它与并行性有什么不同。

Go is a concurrent language and not a parallel one.

并发性 Concurrency 是同时处理许多事情的能力

并行就是同时做很多事情。

img

# 1.3 进程/线程/协程

进程

进程是一个程序在一个数据集中的一次动态执行过程,可以简单理解为“正在执行的程序”,它是 CPU 资源分配和调度的独立单位。

进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

进程的局限是创建、撤销和切换的开销比较大。

线程

线程是在进程之后发展出来的概念。 线程也叫轻量级进程,它是一个基本的 CPU 执行单元,也是程序执行过程中的最小单元,由线程 ID、程序计数器、寄存器集合和堆栈共同组成。

一个进程可以包含多个线程。

线程的优点是减小了程序并发执行时的开销,提高了操作系统的并发性能,缺点是线程没有自己的系统资源,只拥有在运行时必不可少的资源,但同一进程的各线程可以共享进程所拥有的系统资源,如果把进程比作一个车间,那么线程就好比是车间里面的工人。不过对于某些独占性资源存在锁机制,处理不当可能会产生“死锁”。

协程

协程是一种用户态的轻量级线程,又称微线程,英文名 Coroutine,协程的调度完全由用户控制。人们通常将协程和子程序(函数)比较着理解。

子程序调用总是一个入口,一次返回,一旦退出即完成了子程序的执行。

与传统的系统级线程和进程相比,协程的最大优势在于其"轻量级",可以轻松创建上百万个而不会导致系统资源衰竭,而线程和进程通常最多也不能超过 1 万的。这也是协程也叫轻量级线程的原因。

协程与多线程相比,其优势体现在:协程的执行效率极高。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

Go语言对于并发的实现是靠协程,Goroutine

# 二、Goroutine

# 1. Goroutine

Goroutine 是 Go 语言特有的名词。区别于进程 Process,线程 Thread,协程 Coroutine,因为 Go 语言的创造者们觉得和他们是有所区别的,所以专门创造了 Goroutine。

Goroutine 是与其他函数或方法同时运行的函数或方法。 Goroutines 可以被认为是轻量级的线程。与线程相比,创建 Goroutine 的成本很小,它就是一段代码,一个函数入口。以及在堆上为其分配的一个堆栈(初始大小为 4K,会随着程序的执行自动增长删除)。因此它非常廉价,Go 应用程序可以并发运行数千个 Goroutines。

Goroutine 在线程上的优势:

  1. 与线程相比,Goroutine 非常便宜。它们只是堆栈大小的几个 kb,堆栈可以根据应用程序的需要增长和收缩。

    而在线程的情况下,堆栈大小必须指定并且是固定的。

  2. Goroutine 被多路复用到较少的 OS 线程。在一个程序中可能只有一个线程与数千个 Goroutine。

    如果线程中的任何 Goroutine 都表示等待用户输入,则会创建另一个 OS 线程,剩下的 Goroutine 被转移到新的 OS 线程。

    所有这些都由运行时进行处理,我们作为程序员从这些复杂的细节中抽象出来,并得到了一个与并发工作相关的干净的 API。

  3. 当使用 Goroutine 访问共享内存时,通过设计的通道(channel)可以防止竞态条件发生。

    通道可以被认为是 Goroutine 通信的管道。

# 2. 主 Goroutine

封装 main 函数的 Goroutine 称为主 Goroutine。

主 Goroutine 所做的事情并不是执行 main 函数那么简单。它首先要做的是:设定每一个 Goroutine 所能申请的栈空间的最大尺寸。在 32 位的计算机系统中此最大尺寸为 250MB,而在 64 位的计算机系统中此尺寸为 1GB。如果有某个 Goroutine 的栈空间尺寸大于这个限制,那么运行时系统就会引发一个栈溢出(stack overflow)的运行时 panic。随后,这个 Go 程序的运行也会终止。

此后,主 Goroutine 会进行一系列的初始化工作,涉及的工作内容大致如下:

  1. 创建一个特殊的 defer 语句,用于在主 Goroutine 退出时做必要的善后处理。因为主 Goroutine 也可能非正常的结束
  2. 启动专用于在后台清扫内存垃圾的 Goroutine,并设置 GC 可用的标识
  3. 执行 main 包中的 init 函数
  4. 执行 main 函数
  5. 执行完 main 函数后,它还会检查主 Goroutine 是否引发了运行时恐慌(panic),并进行必要的处理
  6. 最后主 Goroutine 会结束自己以及当前进程的运行

# 3. 使用 Goroutine

创建一个 Goroutine 非常简单,只需要在函数或方法调用前面加上关键字 go,就可以创建一个新的 Goroutine。

package main

import (
	"fmt"
	"time"
)

func hello()  {
	fmt.Println("Hello world, Goroutine.")
}


func main()  {
	go hello()
	// 睡眠一下,防止 Goroutine 还没创建并执行完成主 Goroutine 就已经结束了,这样进程就结束了,hello() 也就不会执行了
	time.Sleep(10 * time.Millisecond)
	fmt.Println("main function")
}

输出:

Hello world, Goroutine.
main function

分析:

  1. 当新的 Goroutine 开始时,Goroutine 调用立即返回。与函数不同,Go 不等待 Goroutine 执行结束。当 Goroutine 调用,并且Goroutine 的任何返回值被忽略之后,Go 立即执行到下一行代码。
  2. main 的 Goroutine 应该为其他的 Goroutine 执行。如果 main 的 Goroutine 终止了,程序将被终止,而其他 Goroutine 将不会运行。所以如果我们不睡眠的话,是有可能只输出 main function 的。

# 4. 启动多个 Goroutine

package main

import (  
    "fmt"
    "time"
)

func numbers() {  
    for i := 1; i <= 5; i++ {
        time.Sleep(250 * time.Millisecond)
        fmt.Printf("%d ", i)
    }
}
func alphabets() {  
    for i := 'a'; i <= 'e'; i++ {
        time.Sleep(400 * time.Millisecond)
        fmt.Printf("%c ", i)
    }
}
func main() {  
    go numbers() 
    go alphabets()
    time.Sleep(3000 * time.Millisecond)
    fmt.Println("main terminated")
}

输出:

1 a 2 3 b 4 c 5 d e main terminated  

分析:

img

# 三、Go 语言并发模型

Go 语言相比 Java 等一个很大的优势就是可以方便地编写并发程序。Go 语言内置了 Goroutine 机制,使用 Goroutine 可以快速地开发并发程序, 更好的利用多核处理器资源。接下来我们来了解一下 Go 语言的并发原理。

# 1. 线程模型

在现代操作系统中,线程是处理器调度和分配的基本单位,进程则作为资源拥有的基本单位。 每个进程是由私有的虚拟地址空间、代码、数据和其它各种系统资源组成。线程是进程内部的一个执行单元。 每一个进程至少有一个主执行线程,它无需由用户去主动创建,是由系统自动创建的。 用户根据需要在应用程序中创建其它线程,多个线程并发地运行于同一个进程中。

我们先从线程讲起,无论语言层面何种并发模型,到了操作系统层面,一定是以线程的形态存在的。 而操作系统根据资源访问权限的不同,体系架构可分为用户空间和内核空间:

  • 内核空间主要操作访问 CPU 资源、I/O 资源、内存资源等硬件资源,为上层应用程序提供最基本的基础资源
  • 用户空间就是上层应用程序的固定活动空间,用户空间不可以直接访问资源,必须通过“系统调用”、“库函数”或“ Shell 脚本”来调用内核空间提供的资源

我们现在的计算机语言,可以狭义的认为是一种“软件”,它们中所谓的“线程”,往往是用户态的线程,和操作系统本身内核态的线程(简称 KSE),还是有区别的。

Go 并发编程模型在底层是由操作系统所提供的线程库支撑的,因此还是得从线程实现模型说起。

线程可以视为进程中的控制流。一个进程至少会包含一个线程,因为其中至少会有一个控制流持续运行。因而,一个进程的第一个线程会随着这个进程的启动而创建,这个线程称为该进程的主线程。当然,一个进程也可以包含多个线程。这些线程都是由当前进程中已存在的线程创建出来的,创建的方法就是调用系统调用,更确切地说是调用 pthread create 函数。

拥有多个线程的进程可以并发执行多个任务,并且即使某个或某些任务被阻塞,也不会影响其他任务正常执行,这可以大大改善程序的响应时间和吞吐量。另一方面,线程不可能独立于进程存在。它的生命周期不可能逾越其所属进程的生命周期。

线程的实现模型主要有 3 个,分别是:

  • 用户级线程模型
  • 内核级线程模型
  • 两级线程模型

它们之间最大的差异就在于线程与内核调度实体(Kernel Scheduling Entity,简称 KSE)之间的对应关系上。顾名思义,内核调度实体就是可以被内核的调度器调度的对象。在很多文献和书中,它也称为内核级线程,是操作系统内核的最小调度单元。

# 1.1 内核级线程模型

用户线程与 KSE 是一对一关系(1:1)。

大部分编程语言的线程库(如 Linux 的 pthread,Java 的 java.lang.Thread,C++11 的 std::thread 等等)都是对操作系统的线程(内核级线程)的一层封装,创建出来的每个线程与一个不同的 KSE 静态关联,因此其调度完全由 OS 调度器来做。

这种方式实现简单,直接借助 OS 提供的线程能力,并且不同用户线程之间一般也不会相互影响。但其创建,销毁以及多个线程之间的上下文切换等操作都是直接由 OS 层面亲自来做,在需要使用大量线程的场景下对 OS 的性能影响会很大。

img

每个线程由内核调度器独立的调度,所以如果一个线程阻塞则不影响其他的线程。

优点:

  • 在多核处理器的硬件的支持下,内核空间线程模型支持了真正的并行,当一个线程被阻塞后,允许另一个线程继续执行,所以并发能力较强。

缺点:

  • 每创建一个用户级线程都需要创建一个内核级线程与其对应,这样创建线程的开销比较大,会影响到应用程序的性能。

# 1.2 用户级线程模型

用户线程与 KSE 是多对一关系(M:1)。

这种线程的创建,销毁以及多个线程之间的协调等操作都是由用户自己实现的线程库来负责,对 OS 内核透明,一个进程中所有创建的线程都与同一个 KSE 在运行时动态关联。

现在有许多语言实现的 协程 基本上都属于这种方式。这种实现方式相比内核级线程可以做的很轻量级,对系统资源的消耗会小很多,因此可以创建的数量与上下文切换所花费的代价也会小得多。

但该模型有个致命的缺点:

  • 如果我们在某个用户线程上调用阻塞式系统调用(如用阻塞方式 read 网络 IO),那么一旦 KSE 因阻塞被内核调度出 CPU 的话,剩下的所有对应的用户线程全都会变为阻塞状态(整个进程挂起)。

所以这些语言的 协程库 会把自己一些阻塞的操作重新封装为完全的非阻塞形式,然后在以前要阻塞的点上,主动让出自己,并通过某种方式通知或唤醒其他待执行的用户线程在该 KSE 上运行,从而避免了内核调度器由于 KSE 阻塞而做上下文切换,这样整个进程也不会被阻塞了。

img

优点:

  • 这种模型的好处是线程上下文切换都发生在用户空间,避免的模态切换(mode switch),从而对于性能有积极的影响。

缺点:

  • 所有的线程基于一个内核调度实体即内核线程,这意味着只有一个处理器可以被利用,在多处理器环境下这是不能够被接受的,本质上,用户线程只解决了并发问题,但是没有解决并行问题。如果线程因为 I/O 操作陷入了内核态,内核态线程阻塞等待 I/O 数据,则所有的线程都将会被阻塞,用户空间也可以使用非阻塞而 I/O,但是不能避免性能及复杂度问题。

# 1.3 两级线程模型

用户线程与 KSE 是多对多关系(M:N)。

这种实现综合了前两种模型的优点,为一个进程中创建多个 KSE,并且线程可以与不同的 KSE 在运行时进行动态关联,当某个 KSE 由于其上工作的线程的阻塞操作被内核调度出 CPU 时,当前与其关联的其余用户线程可以重新与其他 KSE 建立关联关系。当然这种动态关联机制的实现很复杂,也需要用户自己去实现,这算是它的一个缺点吧。

Go 语言中的并发就是使用的这种实现方式,Go 为了实现该模型自己实现了一个运行时调度器来负责 Go 中的"线程"与 KSE 的动态关联。

此模型有时也被称为 混合型线程模型即用户调度器实现用户线程到 KSE 的“调度”,内核调度器实现 KSE 到 CPU 上的调度

img

# 2. Go 并发模型 —— GPM 模型

在操作系统提供的内核线程之上,Go 搭建了一个特有的两级线程模型。

Goroutine 机制实现了 M : N 的线程模型,Goroutine 机制是协程(coroutine)的一种实现,Go 内置的调度器,可以让多核 CPU 中每个 CPU 执行一个协程。

# 2.1 调度器是如何工作的

有了上面的知识,我们可以开始真正的介绍 Go 的并发机制了,先用一段代码展示一下在 Go 语言中新建一个 Goroutine:

// 用go关键字加上一个函数(这里用了匿名函数)
// 调用就做到了在一个新的“线程”并发执行任务
go func() { 
    // do something in one new goroutine
}()

功能上等价于 Java8 的代码:

new java.lang.Thread(() -> { 
    // do something in one new thread
}).start();

理解 Goroutine 机制的原理,关键是理解 Go 调度器 scheduler 的实现。

Go 语言中支撑整个 scheduler 实现的主要有 4 个重要结构,分别是 M、G、P、Sched, 前三个定义在 runtime.h 中,Sched 定义在 proc.c 中。

代号 名称 定义位置 作用
Sched 调度器 proc.c 维护有存储 M 和 G 的队列以及调度器的一些状态信息等。
M Machine
系统线程
runtime.h 它由操作系统管理的,Goroutine 就是跑在 M 之上的;M 是一个很大的结构,里面维护小对象内存 cache(mcache)、当前执行的 Goroutine、随机数发生器等等非常多的信息。
P Processor
处理器
runtime.h 它的主要用途就是用来执行 Goroutine 的,它维护了一个 Goroutine 队列,即 runqueue。Processor 是让我们从 N:1 调度到 M:N 调度的重要部分。
G Goroutine 实现的核心结构 runtime.h 它包含了栈,指令指针,以及其他对调度 Goroutine 很重要的信息,例如其阻塞的 channel。

Processor 的数量是在启动时被设置为环境变量 GOMAXPROCS 的值,或者通过运行时调用函数 GOMAXPROCS() 进行设置。

Processor 数量固定意味着任意时刻只有 GOMAXPROCS 个线程在运行 Go 代码。

我们分别用三角形,矩形和圆形表示 Machine、Processor 和 Goroutine。

image-20210720170700064

单核处理器的场景下:

  • 所有 Goroutine 运行在同一个 M 系统线程中,每一个 M 系统线程维护一个 Processor,任何时刻,一个 Processor 中只有一个 Goroutine,其他 Goroutine 在 runqueue 中等待。一个 Goroutine 运行完自己的时间片后,让出上下文,回到 runqueue 中。

多核处理器的场景下:

  • 为了运行 Goroutines,每个 M 系统线程会持有一个 Processor。

# 2.2 线程阻塞

  • 正常情况下:

    image-20210720170843613

    在正常情况下,scheduler 会按照上面的流程进行调度,但是线程会发生阻塞等情况,下面来看一下 Goroutine 对线程阻塞等的处理。

  • 线程阻塞下:

    当正在运行的 Goroutine 阻塞的时候,例如进行系统调用,会再创建一个系统线程(M1),当前的 M 线程放弃了它的 Processor,Processor 会转到新的线程 M1 中去运行。

    image-20210720171040810

# 2.3 runqueue 执行完成

当其中一个 Processor 的 runqueue 为空,没有 Goroutine 可以调度。它会从另外一个上下文偷取一半的 Goroutine

image-20210720171242658

# 2.4 为什么需要 Processor

其图中的 G,P 和 M 都是 Go 语言运行时系统(其中包括内存分配器,并发调度器,垃圾收集器等组件,可以想象为 Java 中的JVM)抽象出来概念和数据结构对象:

  • G:Goroutine 的简称

    上面用 go 关键字加函数调用的代码就是创建了一个 G 对象,是对一个要并发执行的任务的封装,也可以称作用户态线程。属于用户级资源,对 OS 透明,具备轻量级,可以大量创建,上下文切换成本低等特点。

  • M:Machine 的简称

    在 Linux 平台上是用 clone 系统调用创建的,其与用 Linux pthread 库创建出来的线程本质上是一样的,都是利用系统调用创建出来的 OS 线程实体。 M 的作用就是执行 G 中包装的并发任务。Go 运行时系统中的调度器的主要职责就是将 G 公平合理的安排到多个 M 上去执行。其属于 OS 资源,可创建的数量上也受限了 OS,通常情况下 G 的数量都多于活跃的 M 的。

  • P:Processor 的简称

    逻辑处理器,主要作用是管理 G 对象(每个 P 都有一个 G 队列),并为 G 在 M 上的运行提供本地化资源。

从两级线程模型来看,似乎并不需要 P 的参与,有 G 和 M 就可以了,那为什么要加入 P 呢?

其实 Go 语言运行时系统早期(Go1.0)的实现中并没有 P 的概念,Go 中的调度器直接将 G 分配到合适的 M 上运行。但这样带来了很多问题:

  • 例如,不同的 G 在不同的 M 上并发运行时可能都需向系统申请资源(如堆内存),由于资源是全局的,将会由于资源竞争造成很多系统性能损耗。

为了解决类似的问题,后面的 Go(Go1.1)运行时系统加入了 P,让 P 去管理 G 对象,M 要想运行 G 必须先与一个 P 绑定,然后才能运行该 P 管理的 G。这样带来的好处是:

  • 我们可以在 P 对象中预先申请一些系统资源(本地资源),G 需要的时候先向自己的本地 P 申请(无需锁保护),如果不够用或没有再向全局申请,而且从全局拿的时候会多拿一部分,以供后面高效的使用。就像现在我们去政府办事情一样,先去本地政府看能搞定不,如果搞不定再去中央,从而提供办事效率。
  • 而且由于 P 解耦了 G 和 M 对象,这样即使 M 由于被其上正在运行的 G 阻塞住,其余与该 M 关联的 G 也可以随着 P 一起迁移到别的活跃的 M 上继续运行,从而让 G 总能及时找到 M 并运行自己,从而提高系统的并发能力。

Go 运行时系统通过构造 G-P-M 对象模型实现了一套用户态的并发调度系统,可以自己管理和调度自己的并发任务,所以可以说 Go 语言 原生支持并发自己实现的调度器负责将并发任务分配到不同的内核线程上运行,然后内核调度器接管内核线程在 CPU 上的执行与调度。

# 2.5 总结

可以看到 Go 的并发用起来非常简单,用了一个语法糖将内部复杂的实现结结实实的包装了起来。其内部可以用下面这张图来概述:

img

# 2.6 深究

// Goroutine1
func task1() {
    go task2()
    go task3()
}

假如我们有一个 G(Goroutine1)已经通过 P 被安排到了一个 M 上正在执行,在 Goroutine1 执行的过程中我们又创建两个 G,这两个 G 会被马上放入与 Goroutine1 相同的 P 的本地 G 任务队列中,排队等待与该 P 绑定的 M 的执行,这是最基本的结构,很好理解。 关键问题是:

  1. 如何在一个多核心系统上尽量合理分配 G 到多个 M 上运行,充分利用多核,提高并发能力呢?

    如果我们在一个 Goroutine 中通过 go 关键字创建了大量 G,这些 G 虽然暂时会被放在同一个队列, 但如果这时还有空闲 P(系统内 P 的数量默认等于系统 CPU 核心数),Go 运行时系统始终能保证至少有一个(通常也只有一个)活跃的 M 与空闲 P 绑定去各种 G 队列去寻找可运行的G任务,该种M称为 自旋的 M

    一般寻找顺序为:

    1. 自己绑定的 P 的队列
    2. 全局队列
    3. 其他 P 队列

    如果自己 P 队列找到就拿出来开始运行,否则去全局队列看看,由于全局队列需要锁保护,如果里面有很多任务,会转移一批到本地 P 队列中,避免每次都去竞争锁。如果全局队列还是没有,就直接从其他 P 队列偷任务了(偷一半任务回来)。这样就保证了在还有可运行的 G 任务的情况下,总有与 CPU 核心数相等的 M+P 组合在执行 G 任务或在执行 G 的路上(寻找 G 任务)。

  2. 如果某个 M 在执行 G 的过程中被 G 中的系统调用阻塞了,怎么办?

    在这种情况下,这个 M 将会被内核调度器调度出 CPU 并处于阻塞状态,与该 M 关联的其他 G 就没有办法继续执行了,但 Go 运行时系统的一个监控线程(sysmon线程)能探测到这样的 M,并把与该 M 绑定的 P 剥离,寻找其他空闲或新建 M 接管该 P,然后继续运行其中的 G,大致过程如下图所示:

    image-20210720171040810

    然后等到该 M 从阻塞状态恢复,需要重新找一个空闲 P 来继续执行原来的 G,如果这时系统正好没有空闲的 P,就把原来的 G 放到全局队列当中,等待其他 M+P 组合发掘并执行。

  3. 如果某一个 G 在 M 运行时间过长,有没有办法做抢占式调度,让该 M 上的其他 G 获得一定的运行时间,以保证调度系统的公平性?

    我们知道 Linux 的内核调度器主要是基于 时间片优先级 做调度的。对于相同优先级的线程,内核调度器会尽量保证每个线程都能获得一定的执行时间。为了防止有些线程"饿死"的情况,内核调度器会发起抢占式调度将长期运行的线程中断并让出 CPU 资源,让其他线程获得执行机会。

    当然在 Go 的运行时调度器中也有类似的抢占机制,但并不能保证抢占能成功,因为 Go 运行时系统并没有内核调度器的中断能力,它只能通过向运行时间过长的 G 中设置抢占 flag 的方法温柔的让运行的 G 自己主动让出 M 的执行权。

    说到这里就不得不提一下 Goroutine 在运行过程中可以动态扩展自己线程栈的能力,可以从初始的 2KB 大小扩展到最大 1G(64bit 系统上),因此在每次调用函数之前需要先计算该函数调用需要的栈空间大小,然后按需扩展(超过最大值将导致运行时异常)。Go 抢占式调度的机制就是利用在判断要不要扩栈的时候顺便查看以下自己的抢占 flag,决定是否继续执行,还是让出自己。

    运行时系统的监控线程会计时并设置抢占 flag 到运行时间过长的 G,然后 G 在有函数调用的时候会检查该抢占 flag,如果已设置就将自己放入全局队列,这样该 M 上关联的其他 G 就有机会执行了。但如果正在执行的 G 是个很耗时的操作且没有任何函数调用(如只是 for 循环中的计算操作),即使抢占 flag 已经被设置,该 G 还是将一直霸占着当前 M 直到执行完自己的任务。

# 3. runtime 包

参考:

  • https://studygolang.com/articles/11322?fr=sidebar
  • https://www.cnblogs.com/williamjie/p/9456764.html

# 四、临界资源安全问题

# 1. 临界资源

临界资源:指并发环境中多个进程/线程/协程共享的资源。

在并发编程中对临界资源的处理不当, 往往会导致数据不一致的问题。

package main

import (
	"fmt"
	"time"
)

func main() {
	a := 1
	go func() {
		a = 2
		fmt.Println("Goroutine, a:", a)
	}()

	a = 3
	time.Sleep(1 * time.Second)
	fmt.Println("main, a:", a)
}

我们使用 -race 这个可选项来检查是否有临界资源:

go run -race demo1.go

输出:

==================
WARNING: DATA RACE
Write at 0x00c0000180e0 by goroutine 7:
  main.main.func1()
      /Users/bytedance/go/src/baiscstudy/learning/shareData/demo1/demo1.go:11 +0x3c

Previous write at 0x00c0000180e0 by main goroutine:
  main.main()
      /Users/bytedance/go/src/baiscstudy/learning/shareData/demo1/demo1.go:15 +0x88

Goroutine 7 (running) created at:
  main.main()
      /Users/bytedance/go/src/baiscstudy/learning/shareData/demo1/demo1.go:10 +0x7a
==================
Goroutine, a: 2
main, a: 2
Found 1 data race(s)
exit status 66

能够发现有一个数据被多个 Goroutine 共享。

# 2. 临界资源安全问题

并发本身并不复杂,但是因为有了资源竞争的问题,就使得我们开发出好的并发程序变得复杂起来,因为会引起很多莫名其妙的问题。

如果多个 Goroutine 在访问同一个数据资源的时候,其中一个线程修改了数据,那么这个数值就被修改了,对于其他的 Goroutine 来讲,这个数值可能是不对的。

示例:并发实现火车站售票程序,一共有 10 张票,4 个售票口同时出售。

package main

import (
	"fmt"
	"go/src/math/rand"
	"time"
)

// 全局变量
var ticket = 10 //10 张票

func main() {

	// 4 个 Goroutine,代表 4 个售票窗口,它们会操作同一个变量 ticket
	go saleTicket("售票口1")
	go saleTicket("售票口2")
	go saleTicket("售票口3")
	go saleTicket("售票口4")

	// 主 Goroutine 睡眠,确保子 Goroutine 可以运行结束
	time.Sleep(5 * time.Second)
}

func saleTicket(name string)  {
	rand.Seed(time.Now().UnixNano())
	for{
		if ticket > 0 {
			// 睡眠,增大错误发生的概率
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
			// 售票
			fmt.Println(name, "售出:", ticket)
			// 减票
			ticket --
		} else {
			fmt.Println(name, "售罄,没票了...")
			break
		}
	}
}

输出:

售票口1 售出: 10
售票口2 售出: 9
售票口2 售出: 8
售票口2 售出: 7
售票口2 售出: 6
售票口3 售出: 5
售票口4 售出: 4
售票口3 售出: 3
售票口1 售出: 2
售票口3 售出: 1
售票口3 售罄,没票了...
售票口1 售出: 0
售票口1 售罄,没票了...
售票口2 售出: -1
售票口2 售罄,没票了...
售票口4 售出: -2
售票口4 售罄,没票了...

我们发现居然有卖出票号为负数的票,这就是临界资源带来的问题。

# 3. 解决临界资源问题

要想解决临界资源安全的问题,很多编程语言的解决方案都是同步。通过上锁的方式,某一时间段,只能允许一个 Goroutine 来访问这个共享数据,当前 Goroutine 访问完毕,解锁后,其他的 Goroutine 才能来访问。

[法一] 上锁

package main

import (
	"fmt"
	"go/src/math/rand"
	"sync"
	"time"
)

// 全局变量
var ticket = 10 //10 张票

// 锁
var waitGroup sync.WaitGroup
var mutex sync.Mutex

func main() {

	// 4 个 Goroutine,代表 4 个售票窗口,它们会操作同一个变量 ticket

	// 阻塞主 Goroutine,需要 4 个协程调用 waitGroup.Done(),主 Goroutine 才能进行
	waitGroup.Add(4)
	go saleTicket("售票口1")
	go saleTicket("售票口2")
	go saleTicket("售票口3")
	go saleTicket("售票口4")
	// 阻塞主 Goroutine,等待 4 个协程调用 waitGroup.Done()
	waitGroup.Wait()
}

func saleTicket(name string)  {
	rand.Seed(time.Now().UnixNano())
	// 调用 waitGroup.Done(),4 个协程都调用后主 Goroutine 就可以继续执行了
	defer waitGroup.Done()
	for{
		// 上锁
		mutex.Lock()
		if ticket > 0 {
			// 睡眠,增大错误发生的概率
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
			// 售票
			fmt.Println(name, "售出:", ticket)
			// 减票
			ticket --
		} else {
			// 解锁
			mutex.Unlock()
			fmt.Println(name, "售罄,没票了...")
			break
		}
		// 解锁
		mutex.Unlock()
	}
}

输出:

售票口4 售出: 10
售票口4 售出: 9
售票口3 售出: 8
售票口1 售出: 7
售票口2 售出: 6
售票口4 售出: 5
售票口3 售出: 4
售票口1 售出: 3
售票口2 售出: 2
售票口4 售出: 1
售票口2 售罄,没票了...
售票口1 售罄,没票了...
售票口4 售罄,没票了...
售票口3 售罄,没票了...

[法二] channel

在 Go 的并发编程中有一句很经典的话: 不要以共享内存的方式去通信,而要以通信的方式去共享内存

在 Go 语言中并不鼓励用锁保护共享状态的方式在不同的 Goroutine 中分享信息(以共享内存的方式去通信)。而是鼓励通过 channel 将共享状态或共享状态的变化在各个 Goroutine 之间传递(以通信的方式去共享内存),这样同样能像用锁一样保证在同一的时间只有一个 Goroutine 访问共享状态。

当然,在主流的编程语言中为了保证多线程之间共享数据安全性和一致性,都会提供一套基本的同步工具集,如锁,条件变量,原子操作等等。Go 语言标准库也毫不意外的提供了这些同步机制,使用方式也和其他语言也差不多。

package main

import (
	"fmt"
	"go/src/math/rand"
	"time"
)




func main() {

	var ticket = 10 //10 张票
	var ticketChan chan int

	ticketChan = make(chan int)
	// 4 个 Goroutine,代表 4 个售票窗口,它们会操作同一个变量 ticket
	go saleTicket("售票口1", ticketChan)
	go saleTicket("售票口2", ticketChan)
	go saleTicket("售票口3", ticketChan)
	go saleTicket("售票口4", ticketChan)

	// 提供 ticket
	for i := ticket; i > 0; i-- {
		ticketChan <- i
	}

	time.Sleep(10 * time.Second)
}

func saleTicket(name string, ticketChan chan int)  {
	rand.Seed(time.Now().UnixNano())
	for{
		// 拿票
		ticket := <- ticketChan
		if ticket > 0 {
			// 睡眠,增大错误发生的概率
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
			// 售票
			fmt.Println(name, "售出:", ticket)
			// 减票
			ticket --
		} else {
			fmt.Println(name, "售罄,没票了...")
			break
		}
	}
}

输出:

售票口2 售出: 9
售票口4 售出: 10
售票口4 售出: 4
售票口1 售出: 7
售票口4 售出: 3
售票口4 售出: 1
售票口3 售出: 6
售票口2 售出: 5
售票口1 售出: 2

# 4. sync 包

# 五、channel 通道

RECOMMEND

Don't communicate by sharing memory; share memory by communicating.

不要通过共享内存来通信,应该通过通信来共享内存。

在前面讲 Go 语言的并发时候,我们就说过,当多个 Goroutine 想实现共享数据的时候,虽然也提供了传统的同步机制,但是 Go 语言强烈建议的是使用 channel 通道来实现 Goroutines 之间的通信。

Go 语言中,要传递某个数据给另一个 Goroutine,可以把这个数据封装成一个对象,然后把这个对象的指针传入某个 channel 中,另外一个 Goroutine 从这个 channel 中读出这个指针,并处理其指向的内存对象。Go 从语言层面保证同一个时间只有一个 Goroutine 能够访问 channel 里面的数据,为开发者提供了一种优雅简单的工具,所以 Go 的做法就是使用 channel 来通信,通过通信来传递内存数据,使得内存数据在不同的 Goroutine 中传递,而不是使用共享内存来通信。

# 1. 通道的概念

通道可以被认为是 Goroutines 通信的管道。类似于管道中的水从一端到另一端的流动,数据可以从一端发送到另一端,通过通道接收。

每个通道都有与其相关的类型。该类型是通道允许传输的数据类型。(通道的零值为 nil。nil 通道没有任何用处,因此通道必须使用类似于 map 和切片的方法来定义,也就是 make)

# 2. 通道的声明

// 声明通道
var 通道名 chan 数据类型
// 创建通道
通道名 := make(chan 数据类型 [, 缓冲区大小])

示例:

package main

import "fmt"

func main() {
    var a chan int
    if a == nil {
        fmt.Println("channel 是 nil 的, 不能使用,需要先创建通道。。")
        a = make(chan int)
        fmt.Printf("数据类型是: %T", a)
    }
}

# 3. 通道的语法

  • 接收

    data, ok := <- channel
    
    
  • 发送

    channel <- data
    
  • 创建双向通道

    // 双向通道
    chan int
    // 只写通道
    chan <- int
    // 只读通道
    <- chan int
    

# 4. 通道注意点

channel 在使用的时候,有以下几个注意点:

  1. channel 是用于 Goroutine 之间传递消息的
  2. 每个 channel 都有相关联的数据类型,nil chan 是不能使用,类似于nil map,不能直接存储键值对
  3. 使用通道传递数据:<-
    • chan <- data:向通道中写数据
    • data <- chan:从通道中读数据
  4. 阻塞
    • 发送数据:chan <- data 是阻塞的,直到另一个 Goroutine 读取数据来解除阻塞
    • 读取数据:data <- chan 也是阻塞的,直到另一个 Goroutine 写出数据解除阻塞
    • 可以通过在 make 的时候设置缓冲区来避免阻塞
  5. channel 本身就是同步的,意味着同一时间,只能有一个 Goroutine 来操作 channel
  6. channel 是 Goroutine 之间的连接,所以通道的发送和接收必须处在不同的 Goroutine 中

# 5. 通道的阻塞

5.1 发送和接收默认是阻塞的

一个通道发送和接收数据,默认是阻塞的。当一个数据被发送到通道时,在发送语句中被阻塞,直到另一个 Goroutine 从该通道读取数据。相对地,当从通道读取数据时,读取被阻塞,直到一个 Goroutine 将数据写入该通道。

这些通道的特性是帮助 Goroutines 有效地进行通信,而无需像使用其他编程语言中非常常见的显式锁或条件变量。

package main

import "fmt"

func main() {

	// 创建一个 channel
	ch1 := make(chan bool)
	fmt.Println("ch1:", ch1)

	go func() {
		for i := 0; i < 10; i++ {
			fmt.Println("子 Goroutine 中,i:", i)
		}
		// 循环结束后,向 channel 中写数据,表示要结束了
		ch1 <- true

		fmt.Println("子 Goroutine 结束...")
	}()

	// 从 ch1 通道中读取数据
	// 再子 Goroutine 写数据之前,主 Goroutine 是会被阻塞的
	fmt.Println("主 Goroutine 被阻塞..")
	data := <- ch1
	fmt.Println("主 Goroutine 拿到数据了,data:", data)

}

输出:

ch1: 0xc00008c060
主 Goroutine 被阻塞..
子 Goroutine 中,i: 0
子 Goroutine 中,i: 1
子 Goroutine 中,i: 2
子 Goroutine 中,i: 3
子 Goroutine 中,i: 4
子 Goroutine 中,i: 5
子 Goroutine 中,i: 6
子 Goroutine 中,i: 7
子 Goroutine 中,i: 8
子 Goroutine 中,i: 9
子 Goroutine 结束...
主 Goroutine 拿到数据了,data: true

5.2 可以通过设置缓冲区来避免阻塞

package main

import (
	"fmt"
	"time"
)

func main() {

	// 创建一个 channel,待缓冲区,写满 5 个之前不会阻塞
	ch1 := make(chan int, 5)
	fmt.Println("ch1:", ch1)

	go func() {
		time.Sleep(1 * time.Second)
		for i := 0; i < 10; i++ {
			data := <- ch1
			fmt.Println("子 Goroutine 拿数据,i:", data)
		}
		fmt.Println("子 Goroutine 结束...")
	}()

	for i := 0; i < 10; i++ {
		fmt.Println("主 Goroutine 写数据,i:", i)
		ch1 <- i
	}

	fmt.Println("主 Goroutine 结束...")

	time.Sleep(2 * time.Second)

}

输出:

ch1: 0xc00012c000
主 Goroutine 写数据,i: 0
主 Goroutine 写数据,i: 1
主 Goroutine 写数据,i: 2
主 Goroutine 写数据,i: 3
主 Goroutine 写数据,i: 4
# 缓冲区满
主 Goroutine 写数据,i: 5
# 这里会停一下,因为缓冲区满了,会阻塞
主 Goroutine 写数据,i: 6
子 Goroutine 拿数据,i: 0
子 Goroutine 拿数据,i: 1
子 Goroutine 拿数据,i: 2
子 Goroutine 拿数据,i: 3
子 Goroutine 拿数据,i: 4
子 Goroutine 拿数据,i: 5
子 Goroutine 拿数据,i: 6
主 Goroutine 写数据,i: 7
主 Goroutine 写数据,i: 8
主 Goroutine 写数据,i: 9
主 Goroutine 结束...
子 Goroutine 拿数据,i: 7
子 Goroutine 拿数据,i: 8
子 Goroutine 拿数据,i: 9
子 Goroutine 结束...

# 6. 通道的死锁

使用通道时要考虑的一个重要因素是死锁:如果 Goroutine 在一个通道上发送数据,那么预计其他的 Goroutine 应该接收数据。如果这种情况不发生,那么程序将在运行时出现死锁。

类似地,如果 Goroutine 正在等待从通道接收数据,那么另一些 Goroutine 将会在该通道上写入数据,否则程序将会死锁。

package main

func main() {
	ch := make(chan int)
	ch <- 5
}

输出:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/bytedance/go/src/baiscstudy/learning/shareData/channel/channelTest.go:6 +0x50

Process finished with the exit code 2

# 7. 通道的关闭

发送者可以通过关闭 channel,来通知接收方不会有更多的数据被发送到 channel 上:

close(channel)

接收者可以在接收来自通道的数据时使用额外的变量来检查 channel 是否已经关闭:

data, ok := <- channel

在上面的语句中:

  • 如果 ok 的值是 true,表示成功的从通道中读取了一个数据 data
  • 如果 ok 是 false,这意味着我们正在从一个封闭的通道读取数据,从闭通道读取的值将是通道类型的零值

# 8. 通道的遍历

我们可以循环从通道上获取数据,直到通道关闭。for 循环的 for range 形式可用于从通道接收值,直到它关闭为止。

示例代码:

package main

import (
	"fmt"
	"time"
)

func main() {

	ch1 := make(chan int)
	go sendData(ch1)
	// for...range 从通道接收值,直到通道关闭
	for v := range ch1 {
		fmt.Println("从 channel 中收到数据:", v)
	}
	fmt.Println("main over...")

}

func sendData(ch1 chan int)  {
	for i := 0; i < 10; i++ {
		time.Sleep( 1 * time.Second)
		ch1 <- i
	}
	// 关闭通道
	close(ch1)
}

输出:

从 channel 中收到数据: 0
从 channel 中收到数据: 1
从 channel 中收到数据: 2
从 channel 中收到数据: 3
从 channel 中收到数据: 4
从 channel 中收到数据: 5
从 channel 中收到数据: 6
从 channel 中收到数据: 7
从 channel 中收到数据: 8
从 channel 中收到数据: 9
main over...

# 9. 通道的调度

select 是 Go 中的一个控制结构。select 语句类似于 switch 语句,但是 select 会 随机 执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行。

select {
    case num1 := <-ch1:
    	fmt.Println("ch1 中取数据。。", num1)
    case num2, ok := <-ch2:
      if ok {
          fmt.Println("ch2 中取数据。。", num2)
      }else{
          fmt.Println("ch2 通道已经关闭。。")
      }
		default:
          fmt.Println("没有收到数据")
}

# 10. 通道与 time

标准库 time 包中与 channel 相关的主要是定时器 Timer,Timer 让用户可以定义自己的超时逻辑,尤其是在应对 select 处理多个 channel 的超时、单 channel 读写的超时等情形时尤为方便。

Timer 是一次性的时间触发事件,这点与 Ticker 不同,Ticker 是按一定时间间隔持续触发时间事件。

Timer 的常见创建方式:

t:= time.NewTimer(d)
t:= time.AfterFunc(d, f)
c:= time.After(d)

虽然说创建方式不同,但是原理是相同的。

Timer 有3个要素:

  • 定时时间:d
  • 触发动作:f
  • 时间通道:t.C

10.1 time.NewTimer(d)

NewTimer() 创建一个新的 Timer,它将在至少持续时间 d 之后在其通道上发送当前时间。

// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
  // 1. 创建一个 Time 类型的 channel
	c := make(chan Time, 1)
  // 2. 传进一个定时器 Timer 
	t := &Timer{
		C: c,   // channel
		r: runtimeTimer{  
			when: when(d),  // 根据传入的 d 计算何时发送
			f:    sendTime, // 发送当前时间
			arg:  c,
		},
	}
  // 3. 开始计时
	startTimer(&t.r)
	return t
}
  • when 用于在指定的 Duration 类型时间后调用函数或计算表达式
  • 如果只是想指定时间之后执行,使用 time.Sleep()
  • 使用 NewTimer(),可以返回的 Timer 类型在计时器到期之前,取消该计时器
  • 直到使用 <-timer.C 发送一个值,该计时器才会过期

示例:

package main

import (
	"fmt"
	"time"
)

func main() {

	/**
		创建一个计数器,d 时间后触发。
		Go 触发计时器的方法比较特别:在 channel 中发送一个值,值为当前时间
	 */
	timer := time.NewTimer(3 * time.Second)
	fmt.Printf("timer 的类型:%T\n", timer)
	fmt.Println("当前时间:", time.Now())

	/**
		此处在等到 channel 中的信号,执行此段代码会阻塞 3s
	 */
	go func() {
		i := 1
		for  {
			fmt.Println("等待第", i, "秒")
			time.Sleep( 1 * time.Second)
			i ++
		}

	}()
	ch := timer.C     // <- chan time.Time
	fmt.Println("从 timer channel 中收到值:", <-ch)
}

输出:

timer 的类型:*time.Timer
当前时间: 2021-08-02 17:15:16.818942 +0800 CST m=+0.000151763
等待第 1 秒
等待第 2 秒
等待第 3 秒
从 timer channel 中收到值: 2021-08-02 17:15:19.823102 +0800 CST m=+3.004250649

10.2 timer.Stop()

timer.Stop() 用于阻止计时器触发。如果 timer.Stop() 成功停止了计时器,它将返回 true,如果计时器已经过期或已经被停止,则返回 false。timer.Stop() 不关闭通道,这是为了防止错误地从通道读取数据。

要确保在调用 timer.Stop() 之后通道是空的,请检查返回值并清空通道。例如,假设程序还没有从 t.C 收到:

if !t.Stop() {
   <-t.C
}
// 这不能与来自计时器通道的其他接收或对计时器 Stop 方法的其他调用同时进行。
// 对于使用 AfterFunc(d, f) 创建的计时器,如果 t.Stop 返回 false,则表示计时器已经过期,函数 f 已经在它自己的 goroutine 中启动;
// 停止并不等待 f 完成才返回。
// 如果调用者需要知道 f 是否完成,它必须显式地与 f 协调。

示例:

package main

import (
	"fmt"
	"time"
)

func main() {

	// 新建一个计时器,5s 后触发
	timer := time.NewTimer(5 * time.Second)

	// 6s 后停掉计时器
	time.Sleep(6 * time.Second)
	stopOK := timer.Stop()
	if stopOK {
		fmt.Println("timer stop ok")
	} else {
		v := <- timer.C
		fmt.Println("timer stop not ok, clear channel:", v)
	}
}

输出:

timer stop not ok, clear channel: 2021-08-02 17:47:39.886786 +0800 CST m=+5.000164114

10.3 time.After(d)

time.After(d) 在等待持续时间 d 之后,在返回的通道上发送当前时间。它相当于 NewTimer(d).C。在计时器触发之前,垃圾收集器不会恢复底层计时器。如果效率有问题,使用 NewTimer 代替,并在补需要计时器的时候调用 time.Stop() 停止。

func After(d Duration) <-chan Time {
	return NewTimer(d).C
}

示例:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := time.After(3 * time.Second)
	fmt.Printf("ch type: %T\n", ch)
	fmt.Println("now:", time.Now())
	time2 := <- ch
	fmt.Println("now2:", time2)
}

输出:

ch type: <-chan time.Time
now: 2021-08-02 18:06:31.213187 +0800 CST m=+0.000132980
now2: 2021-08-02 18:06:34.213613 +0800 CST m=+3.000659583

# 六、Golang CSP 模型

并发模型:Actors 模型 & CSP 模型

并发模型介绍

并发模型主要由两种实现的形式:

  1. 同一个进程下,多个线程天然的共享内存,由程序对读写做同步控制(有锁或无锁)。
  2. 多个进程通过进程间通讯或者内存映射实现数据的同步。

Actors 模型

在这里插入图片描述

Actors 模型更多的使用 消息机制 来实现并发,目标是让开发者不再考虑线程这种东西,每个 Actor 最多同时进行一样工作,Actor 内部可以有自己的变量和数据

Actors 模型避免了由操作系统进行任务调度的问题,在操作系统进程之上,多个 Actor 可能运行在同一个进程(或线程)中,这就节省了大量的 Context 切换。

在 Actors 模型中,每个 Actor 都有一个专属的命名“邮箱”,其他 Actor 可以随时选择一个 Actor 通过邮箱收发数据,对于“邮箱”的维护,通常是使用发布订阅的机制实现的,比如我们可以定义发布者是自己,订阅者可以是某个 Socket 接口,另外的消息总线或者直接是目标 Actor。

目前 akka 库是比较流行的 Actors 编程模型实现,支持 Scala 和 Java 语言

CSP 模型

CSP(Communicating Sequential Process)模型提供一种多个进程公用的“管道(channel)”, 这个 channel 中存放的是一个个“任务”。

Go 语言中的 Goroutine 就是参考的 CSP 模型,原始的 CSP 中 channel 里的任务都是立即执行的,而 Go 语言可以为其增加了一个缓存,即任务可以先暂存起来,等待执行进程准备好了再逐个按顺序执行。

Actor 与 CSP 的区别

  • CSP 进程通常是同步的(即任务被推送进 channel 就立即执行,如果任务执行的线程正忙,则发送者就暂时无法推送新任务),Actor 进程通常是异步的(消息传递给 Actor 后并不一定马上执行)。
  • 在 CSP 中,我们需要一个中介,也就是 channel 来进行消息传递,在 Actor 中我们可以直接从一个 Actor 往另一个 Actor 传输数据。无需中介。

Go 语言的 CSP 模型是由协程 Goroutine 与通道 channel 实现:

  • Goroutine:是一种轻量线程,它不是操作系统的线程,而是将一个操作系统线程分段使用,通过调度器实现协作式调度。是一种绿色线程,微线程,它与 Coroutine 协程也有区别,能够在发现堵塞后启动新的微线程。
  • channel:类似 Unix 的 Pipe,用于协程之间通讯和同步。协程之间虽然解耦,但是它们和 channel 有着耦合。
上次更新: 10/12/2021, 7:12:39 AM