小时的网站建设,温州做网站建设,哪个做h5的网站好用,没有网站也可以做推广吗一、设计思路
1、设计描述
启动服务之时先初始化一个 Goroutine Pool 池#xff0c;这个 Pool 维护了一个类似栈的 LIFO 队列#xff0c;里面存放负责处理任务的 Worker然后在 client 端提交 task 到 Pool 中之后#xff0c;在 Pool 内部#xff0c;接收 task 之后的核心…一、设计思路
1、设计描述
启动服务之时先初始化一个 Goroutine Pool 池这个 Pool 维护了一个类似栈的 LIFO 队列里面存放负责处理任务的 Worker然后在 client 端提交 task 到 Pool 中之后在 Pool 内部接收 task 之后的核心操作是 检查当前 Worker 队列中是否有可用的 Worker如果有取出执行当前的 task没有可用的 Worker判断当前在运行的 Worker 是否已超过该 Pool 的容量每个 Worker 执行完任务之后放回 Pool 的队列中等待 2、Pool struct
type sig struct{}type f func() error// pool从客户端获取任务它限制 goroutines 总数并且回收再使用
type Pool struct {capacity int32 // 协程池容量running int32 // 正在运行的goroutine数量expiryDuration time.Duration // 为每个worker设置一个过期时间workers []*Worker // 存放空闲 worker,请求进入 Pool先检查workers若有则取出绑定任务执行release chan sig // 当关闭该 Pool 支持通知所有 worker 退出运行以防 goroutine 泄露lock sync.Mutex // 同步操作锁once sync.Once // 确保 Pool 关闭操作只会执行一次
}
3、初始化 Pool 并启动定期清理过期 worker 任务
//新建一个线程池实例
func NewPool(size int) (*Pool,error) {return NewTimingPool(size,DefaultCleanIntervalTime)
}//产生一个带有自定义定时器的线程池实例
func NewTimingPool(size, expiry int) (*Pool,error) {if size 0{return nil,ErrInvalidPoolSize}if expiry {return nil,ErrInvalidPoolExpiry}p : Pool{capacity: int32(size),freeSignal: make(chan sig, math.MaxInt32),release: make(chan sig, 1),expiryDuration: time.Duration(expiry) * time.Second,}// 启动定期清理过期worker任务独立goroutine运行进一步节省系统资源p.monitorAndClear()return p, nil
}
4、提交任务到 Pool
第一个 if 判断当前 Pool 是否已被关闭若是则不再接受新任务否则获取一个 Pool 中可用的 worker绑定该 task 执行
func (p *Pool) Submit(task f) error {if len(p.release) 0 {return ErrPoolClosed}w : p.getWorker()w.task - taskreturn nil
}
5、获取可用 worker核心
p.getWorker()源码
//返回一个可用的 worker 来运行这些任务
func (p *Pool) getWorker() *Worker {var w *Workerwaiting :false //标志变量判断当前正在运行的 worker 数量是否已到达 Pool 的容量上限p.lock。Lock //加锁检测队列中是否有可用 worker并进行相应操作idleWorkers : p.workersn : len(idleWorkers) - 1if n 0 { // 当前队列中无可用worker// 判断运行worker数目已达到该Pool的容量上限置等待标志waiting p.Running() p.Cap()} else { // 当前队列有可用worker从队列尾部取出一个使用w idleWorkers[n]idleWorkers[n] nilp.workers idleWorkers[:n]}p.lock.Unlock() // 检测完成解锁if waiting { // Pool容量已满新请求等待for { // 利用锁阻塞等待直到有空闲workerp.lock.Lock()idleWorkers p.workersl : len(idleWorkers) - 1if l 0 {p.lock.Unlock()continue}w idleWorkers[l]idleWorkers[l] nilp.workers idleWorkers[:l]p.lock.Unlock()break}// 当前无空闲worker但是Pool还没有满则可以直接新开一个worker执行任务} else if w nil {w Worker{pool: p,task: make(chan f, 1),}w.run()// 运行worker数加一p.incRunning()}return w
}
6、执行任务
结合前面的 p.Submit(task f) 和 p.getWorker() 提交任务到 Pool 之后获取一个可用 worker每新建一个 worker 实例之时都需要调用 w.run() 启动一个 goroutine 监听 worker 的任务列表 task 一有任务提交进来就执行所以当调用 worker 的 sendTask(task f) 方法提交任务到 worker 的任务队列之后马上就可以被接收并执行当任务执行完之后会调用 w.pool.putWorker(w *Worker) 方法将这个已经执行完任务的 worker 从当前任务解绑放回 Pool 中以供下个任务可以使用至此一个任务从提交到完成的过程就此结束Pool 调度将进入下一个循环。
// Worker是运行任务的实际执行者它启动一个接受任务并执行函数调用的goroutine
type Worker struct {pool *Pool // 每个pool对应一个workertask chan f // 任务是一项应该完成的工作recycleTime time.Time // 当将一个worker放回队列时recycleTime将被更新。
}// Run启动一个goroutine以重复执行函数调用的过程
func (w *Worker) run() {go func() {// 循环监听任务列表一旦有任务立马取出运行for f : range w.task {if f nil {// 退出goroutine运行worker数减一w.pool.decRunning()return}f()// worker回收复用w.pool.putWorker(w)}}()
}7、worker回收(goroutine 复用)
// putWorker将一个worker放回空闲池回收goroutines
func (p *Pool) putWorker(worker *Worker) {// 写入回收时间亦即该worker的最后一次结束运行的时间worker.recycleTime time.Now()p.lock.Lock()p.workers append(p.workers, worker)p.lock.Unlock()
}8、动态扩容或者缩小池容量
// ReSize更改此池的容量
func (p *Pool) ReSize(size int) {if size p.Cap() {return}atomic.StoreInt32(p.capacity, int32(size))diff : p.Running() - sizeif diff 0 {for i : 0; i diff; i {p.getWorker().task - nil}}
}9、定期清理过期 Worker
定期检查空闲 worker 队列中是否有已过期的 worker 并清理因为采用了 LIFO 后进先出队列存放空闲 worker所以该队列默认已经是按照 worker 的最后运行时间由远及近排序可以方便地按顺序取出空闲队列中的每个 worker 并判断它们的最后运行时间与当前时间之差是否超过设置的过期时长若是则清理掉该 goroutine释放该 worker并且将剩下的未过期 worker 重新分配到当前 Pool 的空闲 worker 队列中进一步节省系统资源
// 定期清理过期 Worker
func (p *Pool) periodicallyPurge() {heartbeat : time.NewTicker(p.expiryDuration)for range heartbeat.C {currentTime : time.Now()p.lock.Lock()idleWorkers : p.workersif len(idleWorkers) 0 p.Running() 0 len(p.release) 0 {p.lock.Unlock()return}n : 0for i, w : range idleWorkers {if currentTime.Sub(w.recycleTime) p.expiryDuration {break}n iw.task - nilidleWorkers[i] nil}nif n len(idleWorkers) {p.workers idleWorkers[:0]} else {p.workers idleWorkers[n:]}p.lock.Unlock()}
}二、pool 使用
1、公共池
package mainimport (fmtsynctimegithub.com/panjf2000/ants/v2
)func demoFunc() {time.Sleep(10 * time.Millisecond)fmt.Println(Hello World!)
}func main() {// 在retrieveWorker()中可能有一些调用者在等待因此我们需要唤醒它们来防止那些无限阻塞的调用者defer ants.Release()var wg sync.WaitGroupsyncCalculateSum : func() {demoFunc()wg.Done()}for i : 0; i 1000; i {wg.Add(1)_ ants.Submit(syncCalculateSum)}wg.Wait()fmt.Printf(running goroutines: %d\n, ants.Running())fmt.Printf(finish all tasks.\n)
}/*Hello World!Hello World!running goroutines: 1000finish all tasks.*/2、方法绑定池
package mainimport (fmtgithub.com/panjf2000/ants/v2sync
)func myFunc(i interface{}) {fmt.Printf(run with %d\n, i)
}func main() {defer ants.Release()var wg sync.WaitGroup// 使用池和函数设置goroutine pool的容量为10超时时间为1秒。p, _ : ants.NewPoolWithFunc(10, func(i interface{}) {myFunc(i)wg.Done()})defer p.Release()// 逐个提交任务for i : 0; i 1000; i {wg.Add(1)_ p.Invoke(int32(i))}wg.Wait()fmt.Printf(running goroutines: %d\n, p.Running())
}/*
run with 976
run with 990
run with 971
running goroutines: 10*/