ie常用网站设置,设计素材网站无版权,wordpress短代码使用,浙江大成建设集团有限公司网站并发模型
传统的编程语言#xff08;如C、Java、Python等#xff09;并非为并发而生的#xff0c;因此它们面对并发的逻辑多是基于操作系统的线程。其并发的执行单元#xff08;线程#xff09;之间的通信利用的也是操作系统提供的线程或进程间通信的原语#xff0c;比如…并发模型
传统的编程语言如C、Java、Python等并非为并发而生的因此它们面对并发的逻辑多是基于操作系统的线程。其并发的执行单元线程之间的通信利用的也是操作系统提供的线程或进程间通信的原语比如共享内存、信号、管道、消息队列、套接字等。在这些通信原语中使用最多、最广泛同时也最高效的是结合了线程同步原语比如锁以及更为低级的原子操作的共享内存方式因此可以说传统语言的并发模型是基于共享内存的模型 这些传统的就基于共享内存的并发模型难用且易错在大型程序中开发人员在设计并发程序时需要根据线程模型对程序进行建模同时规划线程之间的通信方式且程序难以阅读、理解、维护
Go采用了CSPCommunicating Sequential Process通信顺序进程模型 一个符合CSP模型的并发程序应该是一组通过输入/输出原语连接起来的P的集合 CSP模型旨在简化并发程序的编写让并发程序的编写与编写顺序程序一样简单。Tony Hoare认为输入/输出应该是基本的编程原语数据处理逻辑CSP中的P仅需调用输入原语获取数据顺序处理数据并将结果数据通过输出原语输出
CSP理论中的PProcess进程是个抽象概念它代表任何顺序处理逻辑的封装它获取输入数据或从其他P的输出获取并生产可以被其他P消费的输出数据。
为了实现CSP模型中的输入/输出原语Go引入了goroutineP之间的通信原语channel。通过channel将goroutineP组合与连接在一起这使得设计和编写大型并发系统变得更为简单和清晰
虽然CSP模型已经成为Go语言支持的主流并发模型但Go也支持传统的基于共享内存的并发模型并提供基本的低级同步原语主要是sync包中的互斥锁、条件变量、读写锁、原子操作等
那么在实践中应该如何选择是使用channel还是低级同步原语下的共享内存
Go始终推荐以CSP模型风格构建并发程序尤其是在复杂的业务层面。这将提升程序逻辑的清晰度大大降低并发设计的复杂性并让程序更具可读性和可维护性
对于局部情况比如涉及性能敏感的区域或需要保护的结构体数据可以使用更为高效的低级同步原语如sync.Mutex以保证goroutine对数据的同步访问。
并发模式
在语言层面Go针对CSP模型提供了三种并发原语。
goroutine对应CSP模型中的P封装了数据的处理逻辑是Go运行时调度的基本执行单元。channel对应CSP模型中的输入/输出原语用于goroutine之间的通信和同步。select用于应对多路输入/输出可以让goroutine同时协调处理多个channel操作。
深入了解一下在实践中这些原语的常见组合方式即并发模式
创建模式
go关键字function/method 创建 goroutine
go fmt.println(Im a goroutine)
c : srv.NewConn(rw)
go c.serve(connCtx)在稍微复杂的程序里需要考虑通过原语的承载体channel在goroutine间建立联系所以通常采用以下方式建立goroutine
type T struct {...}
func spwan(f func()) chan T {c : make(chan T)go func() {...f()...}()return c
}
func main() {
//使用c与新创建的goroutine通信c : spawn(func(){})
}在内部创建一个goroutine并返回一个channel类型变量函数
spwan函数创建的新的goroutine和调用spwan函数的goroutine通过channel建立联系
函数得以实现得益于channel作为go语言的一等公民first-class citizen的存在channel可以像变量一样被初始化、传递和赋值。上面例子中的spawn只返回了一个channel变量、
退出模式
goroutine的执行函数返回意味着goroutine退出。但有些时候会要求优雅退出以下为方案
分离detached模式
是使用最广泛的goroutine退出模式
创建它的goroutine不需要关心它的退出这类goroutine在启动后即与其创建者彻底分离其生命周期与其执行的主函数相关函数返回即goroutine退出。这类goroutine有两个常见用途。
一次性任务用来执行任务完成后既退出比如此标准库代码
// $GOROOT/src/net/dial.go
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {...if oldCancel : d.Cancel; oldCancel ! nil {subCtx, cancel : context.WithCancel(ctx)defer cancel()
//有数据处理后既退出go func() {select {case -oldCancel:cancel()case -subCtx.Done():}}()ctx subCtx}...
}常驻后台执行的一些特定任务如监视monitor、观察watch等。其实现通常采用for {…}或for { select{…} }代码段形式并多以定时器timer或事件event驱动执行。
// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {// 每个P都有一个运行在后台的用于标记的Gfor _, p : range allp {if p.gcBgMarkWorker 0 {go gcBgMarkWorker(p) // 为每个P创建一个goroutine以运行gcBgMarkWorkernotetsleepg(work.bgMarkReady, -1)noteclear(work.bgMarkReady)}}
}
func gcBgMarkWorker(_p_ *p) {gp : getg()...for { // 常驻后台处理GC事宜...}
}Join模式
在线程模型中父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。
在Go中我们有时候也有类似的需求goroutine的创建者需要等待新goroutine结束。
等待一个goroutine退出
先看一段实例代码
func worker(args ...interface{}) {if len(args) 0 {return}interval, ok : args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))
}
func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {c : make(chan struct{})go func() {f(args...)c - struct{}{}}()return c
}
func main() {done : spawn(worker, 5)println(spawn a worker goroutine)-doneprintln(worker done)
}这个channel的用途就是在两个goroutine之间建立退出事件的“信号”通信机制。main goroutine在创建完新goroutine后便在该channel上阻塞等待直到新goroutine退出前向该channel发送了一个信号。
运行过后 获取goroutine的退出状态
如果不仅要等goroutine退出还要精准获取其结束状态可以通过自定义类型的channel实现这一需求
var OK errors.New(ok)
func worker(args ...interface{}) error {if len(args) 0 {return errors.New(invalid args)}interval, ok : args[0].(int)if !ok {return errors.New(invalid interval arg)}time.Sleep(time.Second * (time.Duration(interval)))return OK
}
func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {c : make(chan error)go func() {c - f(args...)}()return c
}
func main() {done : spawn(worker, 5)println(spawn worker1)err : -donefmt.Println(worker1 done:, err)done spawn(worker)println(spawn worker2)err -donefmt.Println(worker2 done:, err)
}将channel中承载的类型由struct{}改为了error这样channel承载的信息就不只是一个信号了还携带了有价值的信息新goroutine的结束状态。运行上述示例 等待多个goroutine退出
有时候必须等待全部新goroutine退出可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式
func worker(args ...interface{}) {if len(args) 0 {return}interval, ok : args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))
}
func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {c : make(chan struct{})var wg sync.WaitGroupfor i : 0; i n; i {wg.Add(1)go func(i int) {name : fmt.Sprintf(worker-%d:, i)f(args...)println(name, done)wg.Done() // worker done!}(i)}go func() {wg.Wait()c - struct{}{}}()return c
}
func main() {done : spawnGroup(5, worker, 3)println(spawn a group of workers)-doneprintln(group workers done)
}通过sync.WaitGroupspawnGroup每创建一个goroutine都会调用wg.Add(1)新创建的goroutine会在退出前调用wg.Done。
在spawnGroup中还创建了一个用于监视的goroutine该goroutine调用sync.WaitGroup的Wait方法来等待所有goroutine退出。
在所有新创建的goroutine退出后Wait方法返回该监视goroutine会向done这个channel写入一个信号这时main goroutine才会从阻塞在done channel上的状态中恢复继续往下执行。
运行上述示例代码
支持超时机制的等待
设置合理的退出时间如若没有退出则继续执行下一步
func main() {done : spawnGroup(5, worker, 30)println(spawn a group of workers)timer : time.NewTimer(time.Second * 5)defer timer.Stop()select {case -timer.C:println(wait group workers exit timeout!)case -done:println(group workers done)}
}notify-and-wait模式
main goroutine的停止代表着整个程序的停止如果不事先通知退出则容易导致业务数据损坏、不完整
我们可以通过notify-and-wait通知并等待模式来满足这一场景的要求。虽然这一模式也不能完全避免损失但是它给了各个goroutine一个挽救数据的机会从而尽可能减少损失。
通知并等待一个goroutine的退出
func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))
}
func spawn(f func(int)) chan string {quit : make(chan string)go func() {var job chan int // 模拟job channelfor {select {case j : -job:f(j)case -quit:quit - ok}}}()return quit
}
func main() {quit : spawn(worker)println(spawn a worker goroutine)time.Sleep(5 * time.Second)// 通知新创建的goroutine退出println(notify the worker to exit...)quit - exittimer : time.NewTimer(time.Second * 10)defer timer.Stop()select {case status : -quit:println(worker done:, status)case -timer.C:println(wait worker exit timeout)}
}执行
此时spawn函数不仅发送退出信号给创建者还承载创建者发送的退出信号形成了一个双向的数据通道
通知并等待多个goroutine退出
channel存在一个特性当使用close关闭channel时所有阻塞到该channel上的goroutine都会得到通知所以可以利用这一特性实现这一模式
func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))
}
func spawnGroup(n int, f func(int)) chan struct{} {quit : make(chan struct{})job : make(chan int)var wg sync.WaitGroupfor i : 0; i n; i {wg.Add(1)go func(i int) {defer wg.Done() // 保证wg.Done在goroutine退出前被执行name : fmt.Sprintf(worker-%d:, i)for {j, ok : -jobif !ok {println(name, done)return}// 执行这个jobworker(j)}}(i)}go func() {-quitclose(job) // 广播给所有新goroutinewg.Wait()quit - struct{}{}}()return quit
}
func main() {quit : spawnGroup(5, worker)println(spawn a group of workers)time.Sleep(5 * time.Second)// 通知 worker goroutine 组退出println(notify the worker group to exit...)quit - struct{}{}timer : time.NewTimer(time.Second * 5)defer timer.Stop()select {case -timer.C:println(wait group workers exit timeout!)case -quit:println(group workers done)}
}创建者直接利用了worker goroutine接收任务job的channel来广播退出通知而实现这一广播的代码就是close(job)。此时各个worker goroutine监听job channel当创建者关闭job channel时通过“comma ok”模式获取的ok值为false也就表明该channel已经被关闭于是worker goroutine执行退出逻辑退出前wg.Done()被执行。
执行
退出模式的应用
由于goroutine的运行状态不同因此很难用同种框架全面管理所以我们可以只实现一个“超时等待退出”框架以统一解决各种运行状态
一组goroutine的退出有两种情况第一种情况是并发退出当goroutine的退出先后数据对数据处理无影响时可使用另一种则是串行退出也就是次序错误可能导致程序状态混乱和错误
并发退出串行退出