Golang -- Channel

Golang之channel数据类型解析

设计原理

Go 语言中最常见的、也是经常被人提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。

shared-memory

多线程使用共享内存传递数据

虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)1。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据

channel-and-goroutines

Goroutine 使用 Channel 传递数据

上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。

先入先出

目前的 Channel 收发操作均遵循了先进先出的设计,具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

无锁管道

锁是一种常见的并发控制技术,我们一般会将锁分成乐观锁和悲观锁,即乐观并发控制和悲观并发控制,无锁(lock-free)队列更准确的描述是使用乐观并发控制的队列。乐观并发控制也叫乐观锁,很多人都会误以为乐观锁是与悲观锁差不多,然而它并不是真正的锁,只是一种并发控制的思想。

concurrency-control

悲观并发控制与乐观并发控制

乐观并发控制本质上是基于验证的协议,我们使用原子指令 CAS(compare-and-swap 或者 compare-and-set)在多线程中同步数据,无锁队列的实现也依赖这一原子指令。

Channel 在运行时的内部表示是 runtime.hchan,该结构体中包含了用于保护成员变量的互斥锁,从某种程度上说,Channel 是一个用于同步和通信的有锁队列,使用互斥锁解决程序中可能存在的线程竞争问题是很常见的,我们能很容易地实现有锁队列。

然而锁导致的休眠和唤醒会带来额外的上下文切换,如果临界区过大,加锁解锁导致的额外开销就会成为性能瓶颈。1994 年的论文 Implementing lock-free queues 就研究了如何使用无锁的数据结构实现先进先出队列,而 Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:

  • 同步Channel - 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
  • 异步Channel - 基于环形缓存的传统生产者消费者模型;
  • chan struct{} 类型的异步Channel - struct{}类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义。

这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。社区中已经有无锁 Channel 的实现,但是在实际的基准测试中,无锁队列在多核测试中的表现还需要进一步的改进。

数据结构

源码位置:src/runtime/chan.go#L33

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
 qcount   uint           // total data in the queue, 队列中的元素数量
 dataqsiz uint           // size of the circular queue,  底层循环数组的长度
 buf      unsafe.Pointer // points to an array of dataqsiz elements, 指向底层循环数组的指针,只针对有缓冲区的 channel
 elemsize uint16        // channel 中的元素大小
 closed   uint32        // channel是否被关闭的标识
 elemtype *_type // element type  ,channel中元素类型
 sendx    uint   // send index,已发送元素在循环数组中的索引
 recvx    uint   // receive index,已接收元素在数组中的索引
 recvq    waitq  // list of recv waiters,等待接收的 `goroutine` 队列
 sendq    waitq  // list of send waiters,等待发送的 `goroutine` 队列

 // lock protects all fields in hchan, as well as several
 // fields in sudogs blocked on this channel.
 //
 // Do not change another G's status while holding this lock
 // (in particular, do not ready a G), as this can deadlock
 // with stack shrinking.
 lock mutex   // 保护 channel 中的所有字段
}

字段解释

qcount: 队列中的元素数量
dataqsiz: 底层循环数组的长度
buf: 指向底层循环数组的指针,只针对有缓冲区的 channel
elemsize: channel 中的元素数据类型大小
closed: channel是否被关闭的标识
elemtype: channel中的元素类型
sendx: 已发送元素在循环数组中的索引
recvx: 已接收元素在数组中的索引
recvq: 等待接收的goroutine 队列
sendq: 等待发送的goroutine 队列
lock: 保护channel 中所有字段,保证每个读或者写channel都是原子的。

sendqrecvq 存储了当前Channel由于缓冲区空间不足二阻塞的 Goroutine 列表,这些等待队列使用双向链表runtime.waitq表示,结构如下:

1
2
3
4
type waitq struct {
 first *sudog
 last  *sudog
}

runtime.sudog 表示一个在等待列表中的Goroutine,该结构中存储了两个分别指向前后runtime.sudog的指针以构成链表。

创建管道

语法如下:

1
2
3
4
5
// 无缓冲通道
ch1 := make(chan int)

// 有缓冲通道
ch2 := make(chan int, 2); // 创建一个缓冲区长度为2,元素类型为 int 的`channel`,若未指定缓冲区长度,则默认为0

Go 语言中所有Channel的床架都会使用 make关键字。编译器会将make(chan int, 10)表达式转换成 OMAKE类型的节点,并在类型检查阶段将OMAKE类型的节点转换成OMAKECHAN类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func typecheck1(n *Node, top int) (res *Node) {
 switch n.Op {
 case OMAKE:
  ...
  switch t.Etype {
  case TCHAN:
   l = nil
   if i < len(args) { // 带缓冲区的异步 Channel
    ...
    n.Left = l
   } else { // 不带缓冲区的同步 Channel
    n.Left = nodintconst(0)
   }
   n.Op = OMAKECHAN
  }
 }
}

这一阶段会对传入的make关键字的缓冲区大小进行检查,如果我们不向make传递表示缓冲区大小参数,那么就会设置一个默认值0,也就是当前的Channel不存在缓冲区。

OMAKECHAN类型的节点最终都会在SSA中间代码生成阶段之前被转换成调用runtime.makechan或者runtime.makechan64函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func walkexpr(n *Node, init *Nodes) *Node {
 switch n.Op {
 case OMAKECHAN:
  size := n.Left
  fnname := "makechan64"
  argtype := types.Types[TINT64]

  if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
   fnname = "makechan"
   argtype = types.Types[TINT]
  }
  n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
 }
}

runtime.makechanruntime.makechan64 会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况,因为这在 Channel 中并不常见,所以我们重点关注 runtime.makechan:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func makechan(t *chantype, size int) *hchan {
 elem := t.elem
 mem, _ := math.MulUintptr(elem.size, uintptr(size))

 var c *hchan
 switch {
 case mem == 0:
  c = (*hchan)(mallocgc(hchanSize, nil, true))
  c.buf = c.raceaddr()
 case elem.kind&kindNoPointers != 0:
  c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  c.buf = add(unsafe.Pointer(c), hchanSize)
 default:
  c = new(hchan)
  c.buf = mallocgc(mem, elem, true)
 }
 c.elemsize = uint16(elem.size)
 c.elemtype = elem
 c.dataqsiz = uint(size)
 return c
}

上述代码根据 Channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 和缓冲区:

  • 如果当前Channel中不存在缓冲区,那么就只会为runtime.hchan分配一段内存空间;
  • 如果当前Channel中存储的类型不是指针类型,会为当前的Channel和底层的数组分配一块连续的内存空间;
  • 在默认情况下会单独为runtime.hchan和缓冲区分配内存。

在函数的最后会统一更新runtime.hchanelemsizeelemtypedatasize几个字段。

源码位置:src/runtime/chan.go#L72

发送数据

我们想要向Channel发送数据时,就需要使用 ch <- i语句,编译器会将它解析成OSEND节点并在cmd/compile/internal/gc.walkexpr中转换成runtime.chansend1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func walkexpr(n *Node, init *Nodes) *Node {
 switch n.Op {
 case OSEND:
  n1 := n.Right
  n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
  n1 = walkexpr(n1, init)
  n1 = nod(OADDR, n1, nil)
  n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
 }
}

chansend1只是调用了runtime.chansend并传入Channel和需要发送的数据。chansend是向Channel 中发送数据时一定会调用的函数,该函数包含了发送数据的全部逻辑,如果我们在调用时将block参数设置成true,那么表示当前发送操作是阻塞的。 源码位置:src/runtime/chan.go#L160

1
2
3
4
5
6
7
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 lock(&c.lock)

 if c.closed != 0 {
  unlock(&c.lock)
  panic(plainError("send on closed channel"))
 }

在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止多个线程并发修改数据。如果 Channel 已经关闭,那么向该 Channel 发送数据时会报 “send on closed channel” 错误并中止程序。

因为runtime.chansend函数的实现比较复杂,所以我们将该函数的执行过程分为以下三部分:

  • 当存在等待的接收者,通过runtime.send直接将数据发送给阻塞的接收者;
  • 当缓冲区存在空余空间时,将发送的数据写入Channel的缓冲区;
  • 当不存在缓冲区或者缓冲区已满,等待其他 Goroutine从Channel接收数据。

直接发送

如果目标Channel没有被关闭并且已经有处于读等待的Goroutine,那么runtime.chansend会从接收队列recvq中取出最先陷入等待的Goroutine并直接向它们发送数据:

1
2
3
4
if sg := c.recvq.dequeue(); sg != nil {
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
 }

发送数据时会调用runtime.send,该函数的执行可以分为两个部分:

  1. 调用runtime.sendDirect将发送的数据直接拷贝到x = <-c表达式中变量x所在的内存地址上;
  2. 调用runtime.goready将等待接收数据的Goroutine标记成可运行状态Grunnable并把该Goroutine放到发送方所在的处理器runnext上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if sg.elem != nil {
  sendDirect(c.elemtype, sg, ep)
  sg.elem = nil
 }
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 goready(gp, skip+1)
}

需要注意的是,发送数据的过程只是将接收方的Goroutine放到了处理器的runnext中,程序没有立刻执行该Goroutine。


缓冲区

如果创建的Channel包含缓冲区并且Channel中的数据没有装满,会执行下面这段代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
 if c.qcount < c.dataqsiz {
  qp := chanbuf(c, c.sendx)
  typedmemmove(c.elemtype, qp, ep)
  c.sendx++
  if c.sendx == c.dataqsiz {
   c.sendx = 0
  }
  c.qcount++
  unlock(&c.lock)
  return true
 }
 ...
}

在这里我们首先会使用 runtime.chanbuf 计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器。

如果当前Channel的缓冲区未满,向Channel发送的数据会存储在Channel的sendx索引所在的位置,并将sendx索引加1,,因为这里的buf是一个循环数组,所以当sendx等于dataqsiz时会重新回到数组开始的位置。


阻塞发送

当Channel没有接收者能够处理数据时,向Channel发送数据会被下游阻塞,当然使用select关键字可以向Channel非阻塞的发送消息。向Channel阻塞地发送数据会执行下面的代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
 if !block {
  unlock(&c.lock)
  return false
 }

 gp := getg()
 mysg := acquireSudog()
 mysg.elem = ep
 mysg.g = gp
 mysg.c = c
 gp.waiting = mysg
 c.sendq.enqueue(mysg)
 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

 gp.waiting = nil
 gp.param = nil
 mysg.c = nil
 releaseSudog(mysg)
 return true
}
  1. 调用 runtime.getg 获取发送数据使用的 Goroutine;
  2. 执行 runtime.acquireSudog 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 select 中和待发送数据的内存地址等;
  3. 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪;
  4. 调用 runtime.goparkunlock 将当前的 Goroutine 陷入沉睡等待唤醒;
  5. 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 runtime.sudog 结构体;

函数在最后会返回 true 表示这次我们已经成功向 Channel 发送了数据。

小结

我们在这里可以简单梳理和总结一下使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:

  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  3. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;

发送数据的过程中包含几个会触发Goroutine调度的时机:

  1. 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度;
  2. 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权;

接收数据

接收数据有两种写法,一种是只值返回接收数据,第二种是返回接收数据和channel的关闭状态两个字段,当接收到响应类型的零值时需要判断是真实的发送者发送的数据,还是channel被关闭后,返回给接收者的默认类型的零值,可以使用第二种返回channel的关闭状态。

1
2
i := <- ch
i, ok := <- ch

这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点,后者会在类型检查阶段被转换成 OAS2RECV 类型。虽然不同的接收方式会被转换成 runtime.chanrecv1 和 runtime.chanrecv2 两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv。

当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark 让出处理器的使用权。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 if c == nil {
  if !block {
   return
  }
  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }

 lock(&c.lock)

 if c.closed != 0 && c.qcount == 0 {
  unlock(&c.lock)
  if ep != nil {
   typedmemclr(c.elemtype, ep)
  }
  return true, false
 }

皖ICP备20014602号
Built with Hugo
Theme Stack designed by Jimmy