用什么做网站后台的,宁夏建设造价网站,合肥网站制作哪家有名,长春网络营销前面一篇博客 nsq中diskqueue详解 - 第二篇_YZF_Kevin的博客-CSDN博客 我们讲了diskqueue的两种文件存储格式#xff0c;diskqueue的启动入口#xff0c;元数据文件的读取和写入#xff0c;如果你还没了解过#xff0c;强烈建议先看一下 这篇博客#xff0c;我们重点讲dis…前面一篇博客 nsq中diskqueue详解 - 第二篇_YZF_Kevin的博客-CSDN博客 我们讲了diskqueue的两种文件存储格式diskqueue的启动入口元数据文件的读取和写入如果你还没了解过强烈建议先看一下 这篇博客我们重点讲diskqueue的定义以及最核心的ioloop循环写入消息的处理读取消息的处理等等 1. diskqueue的定义
先看下diskqueue的源码定义如下(已加详细注释)
// FIFO的持久化队列
type diskQueue struct {// 64bit atomic vars need to be first for proper alignment on 32bit platformsreadPos int64 // 读文件的读取点writePos int64 // 写文件的写入点也是当前文件的总写入字节数readFileNum int64 // 当前要读取的文件编号writeFileNum int64 // 当前要写入的文件编号depth int64 // 总的消息数注意不是当前文件的写入消息数而是全部的sync.RWMutex // 读写锁对exitFlag等标记的读写时需加锁name string // 队列实例名字(只是打印时用用来跟其他队列有所区别nsq中使用topicchannel来标识一个队列)dataPath string // 文件所在路径maxBytesPerFile int64 // 一个文件的最大字节数外部传入maxBytesPerFileRead int64 // 当前文件最大可读字节数跟maxBytesPerFile不同因为一个文件限定100M可能实际只写了99.97M读取时只能以99.97M为准minMsgSize int32 // msg的最小字节数外部传入maxMsgSize int32 // msg的最大字节数外部传入syncEvery int64 // 读写多少次触发刷到磁盘syncTimeout time.Duration // 同步到文件的间隔触发时还会判断是否有变化有变化才会真的刷exitFlag int32 // 退出标记1表正在退出不再接收新数据写入ioloop()协程也退出needSync bool // 标记需同步到磁盘nextReadPos int64 // 下次应该读的位置不直接更新读位置是因为消息还没投递给外部不算真正读完成如果外部没接收下次还得从readPos读。// 只有外部接收完成才算消息投递完成才可以把 readPos 更新为 nextReadPosnextReadFileNum int64 // 下次应该读的文件号原因同上readFile *os.File // 读文件的文件指针writeFile *os.File // 写文件的文件指针reader *bufio.Reader // 读文件对象的readerwriteBuf bytes.Buffer // 写文件对象的bufferreadChan chan []byte // 只读的通道通过ReadChan()返回给外面使用无缓冲压入数据后只能等外部使用者取走后才能继续peekChan chan []byte // 查看的通道通过PeekChan()返回给外面使用// 内部使用的通道depthChan chan int64 // 存放当前总消息数的通道writeChan chan []byte // 接收外部数据的通道无缓冲所以压入后阻塞等待ioloop()循环的处理writeResponseChan chan error // 接收外部数据后回应的通道无缓冲emptyChan chan int // 清空队列信号的通道当外部调用Empty()时会往该通道写1ioloop()循环中读取后会执行清空操作emptyResponseChan chan error // 清空队列信号结果的通道exitChan chan int // 退出通道exitSyncChan chan int // 退出结果的通道logf AppLogFunc // 日志函数外部传入
}
都是一些变量定义已经加了详细的注释这里不再一一介绍只是总结下
1. 前面的5个变量 readPos, writePos, readFileNum, writeFileNum, depth不用多介绍上一篇博客已经讲过了都是元数据的字段描述队列的整体信息
2. 至于name, dataPath,maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimieout这7个变量都是diskqueue启动时外部传入的主要是设置队列属性
3. maxBytesPerFileRead 不同于 maxBytesPerFile maxBytesPerFile 是所有消息数据文件的最大写入字节数是个限制值比如100M maxBytesPerFileRead 是当前读文件的最大可读字节数这个是实际值每当读一个新文件时都会获取文件的实际大小实际只能读这么多比如文件实际是99.97M那只能读这么多
4. needSync是标记是否需同步磁盘读写次数超过限定的syncEvery时或需新建写文件时都要置为trueioloop()循环就会执行一次强制刷新到磁盘
5. nextReadPos和nextReadFileNum我们一般会认为数据读取后不就可以后移读位置了么其实不是这样的因为这里读取完毕后仅仅是尝试压入readChan不代表外部真的接收了。举例这里读取成功后尝试压入到readChanreadChan是个无缓冲的通道但外部一直没接收最后还关闭了自己下次再从这里读消息时我们还要投递这个消息也就是说读取位置不能变nextReadFileNum同理只有上一个文件全部读取且外部接收成功了这里才能读新文件
6. readChan和peekChan对外提供的ReadChan()是获取一个只读通道这里读取一个消息后压入readChan等待外部接收后才返回后移读位置再读取下一个消息对外提供的PeekChan()也是获取一个只读通道等待外部接收后才返回但是不后移读位置也不会触发读下一个消息也就是说PeekChan()正如其名仅仅提供查看功能 2. diskqueue的核心ioloop()
我们先说下ioloop()函数的运作机制这样大家先做到心里有数再看源码就很轻松了
1. 建立一个定时器触发时间为外部传入的syncTimeout定时器触发时检测有没有读写发生有则同步一次磁盘
2. writeChan是接收外部写入的通道外部写入后本函数会在select中读取到调用writeOne()写入到当前的写文件(如果发现文件即将超上限就新开一个文件写)count值1
3. readChan是给外部读取的通道本函数总是会读取一个消息往readChan中压入等待外部接收外部接收成功后本函数会在select中返回就再读取下一个消息如果发现当前读文件已读完就读下一个文件count值1
4. 每读取一个消息每写入一个消息count都会加1cout值达到外部传入的syncEvery就同步一次磁盘
5. emptyChan 用来接收外部的清空信号本函数的select中接收到就进行清空操作
6. exitChan 用来接收外部的退出信号本函数的select中接收到就进行退出操作 好了上面的6个操作就是ioloop()函数的核心了现在贴代码已添加详细注释
// 独立协程运行
func (d *diskQueue) ioLoop() {var dataRead []byte // 存储每次读取的数据var err errorvar count int64 // 每readwrite一次count值1满syncEvery则往磁盘同步一次var r chan []byte // 对外的读消息通道为nil时说明没有数据可读var p chan []byte // 对外的查看消息通道为nil时说明没有数据可读// 同步定时器syncTicker : time.NewTicker(d.syncTimeout)for {// count值够了就标记需同步if count d.syncEvery {d.needSync true}// 发现需同步if d.needSync {err d.sync() // 同步到磁盘该函数在同步磁盘后会把needSync置为falseif err ! nil {d.logf(ERROR, DISKQUEUE(%s) failed to sync - %s, d.name, err)}count 0 // count重新从0开始计}// 有消息可读的条件读的是旧文件 或者 读的位置比写的位置小if (d.readFileNum d.writeFileNum) || (d.readPos d.writePos) {// 读位置已经移动(说明外部取走了数据)才可读下一个消息if d.nextReadPos d.readPos {dataRead, err d.readOne()if err ! nil {d.logf(ERROR, DISKQUEUE(%s) reading at %d of %s - %s, d.name, d.readPos, d.fileName(d.readFileNum), err)// 读出错的处理d.handleReadError()continue}}// 赋值通道r d.readChanp d.peekChan} else { // 不可读的时候把r,p置为nil确保下面的select会直接跳过外部使用者在select中判断时也会跳过r nilp nil}// go中通道的特性决定select中对为nil的chan的读/写操作会直接跳过// 所以只有有数据可读的时候p,r才有值(p指向peekChanr指向readChan)select {case p - dataRead:// 注意这里什么都没做因为p仅仅是查看通道消息不算取走case r - dataRead: // 把读出来的这个消息压入到readChan外部从readChan取走以后这里会立即返回// 每取走一个消息count值1count// 重新计算下次读位置读文件号d.moveForward()case d.depthChan - d.depth: // 把最新的未读消息数压入对外通道// 什么都不做外部取走未处理消息数跟本协程没关系case -d.emptyChan: // 收到清空队列的信号d.emptyResponseChan - d.deleteAllFiles() // 删除所有的文件(清空队列)并把结果压入返回通道因为外部调用者还在等结果count 0 // 重新从0计数case dataWrite : -d.writeChan: // 新接收到消息count // 每收到一个消息count值1d.writeResponseChan - d.writeOne(dataWrite) // 消息写入到文件缓冲区结果压入返回通道case -syncTicker.C:// 同步定时器if count 0 {continue // 虽然时间到了但数据没有变化也不用同步}d.needSync true // 这里仅标记下次for循环会进行同步操作case -d.exitChan: // 退出信号goto exit}}exit:d.logf(INFO, DISKQUEUE(%s): closing ... ioLoop, d.name)syncTicker.Stop()// 退出完成往exitSyncChan发信号因为主协程还在等d.exitSyncChan - 1
}
上面已经讲了ioloop()函数的运作机制代码也添加了详细的注释相信大家都能轻松看懂
我再啰嗦下几个值得注意的点
1. diskqueue的运作流程是一边新写入消息一边读取处理像生产者消费者机制一样
如果读慢写快结果就是消息文件会一直新增这倒没什么反正消息已保存进文件了后面再处理就是了
如果读快写慢那么只要持续的时间够长读的位置一定会追上写的位置造成无消息可读等于发生了读写追尾这个时候读操作就要停止
所以总结下可读操作的条件
要么读的文件号较小写的文件号较大读写不是同一个文件
要么读写的是同一个文件但读的位置比写的位置小
2. 函数内r表可读的通道p表查看的通道当不可读的时候r,p均赋值为nil本函数的select会直接跳过对值为nil通道的写入外部接受者的select也会跳过对值为nil通道的读取这是golang通道的特性之一大家注意
3. diskqueue在退出/删除时会调用exit()函数该函数会关闭通道exitChan此时ioloop()的select就会返回关闭ioloop的协程。注意关闭通道时所有监听该通道的select都会收到消息这也是golang通道的特性之一 3. 新写入消息时的处理
diskqueue对外提供的写入消息接口为 Put([]byte) error 这个接口的实现不复杂就是把对应的数据写入到当前在写文件如果发现写入数据后当前在写文件会超上限那就不写了关闭当前的写文件新开一个写入文件从0位置开始写
源码如下(已添加详细注释)
// 把指定数据压入接收队列
func (d *diskQueue) Put(data []byte) error {d.RLock()defer d.RUnlock()// 如果队列正在退出返回吧if d.exitFlag 1 {return errors.New(exiting)}d.writeChan - data // 压入接收队列因为无缓冲所以会阻塞等待ioloop()循环中取走才会返回return -d.writeResponseChan// 阻塞等待结果ioloop()从writeChan读取执行后会立马出结果
}
可以看到函数操作很简单核心操作就是往d.writeChan中压入数据由于d.writeChan是无缓冲通道所以会阻塞等待ioloop()函数中select的取走
ioloop()函数中select对此的处理如下 case dataWrite : -d.writeChan: // 新接收到消息count // 每收到一个消息count值1d.writeResponseChan - d.writeOne(dataWrite) // 消息写入到文件缓冲区结果压入返回通道 select对此的处理也很简单count值加1后调用了d.writeOne(dataWrite)然后把writeOne()的结果写入到writeResponseChan
我们看下writeOne()函数的处理源码如下(已添加详细注释)
// 写入一个消息到缓冲区函数内部会自动创建新文件
func (d *diskQueue) writeOne(data []byte) error {var err errordataLen : int32(len(data)) // 数据长度totalBytes : int64(4 dataLen) // 本次写入的总字节数4字节的数据长度 真正数据部分// 数据大小检测if dataLen d.minMsgSize || dataLen d.maxMsgSize {return fmt.Errorf(invalid message write size (%d) minMsgSize%d maxMsgSize%d, dataLen, d.minMsgSize, d.maxMsgSize)}// 如果加上本次写入量会超过文件最大限制就关闭当前文件创建新的if d.writePos 0 d.writePostotalBytes d.maxBytesPerFile {// 如果当前已经在读这个文件if d.readFileNum d.writeFileNum {d.maxBytesPerFileRead d.writePos // 标识最大可读字节数即writePos因为不再写入这个文件了下面会往新文件里面写了}d.writeFileNum // 新文件编号d.writePos 0 // 新文件的写入起始点// 当前文件的内容刷到磁盘err d.sync()if err ! nil {d.logf(ERROR, DISKQUEUE(%s) failed to sync - %s, d.name, err)}// 关闭当前文件if d.writeFile ! nil {d.writeFile.Close()d.writeFile nil}}// 要写的文件还不存在新建if d.writeFile nil {// 格式化文件名curFileName : d.fileName(d.writeFileNum)// 创建文件d.writeFile, err os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)if err ! nil {return err}d.logf(INFO, DISKQUEUE(%s): writeOne() opened %s, d.name, curFileName)// 如果已有写入位置if d.writePos 0 {_, err d.writeFile.Seek(d.writePos, 0) // 偏移文件游标0表从文件开头进行偏移if err ! nil {d.writeFile.Close()d.writeFile nilreturn err}}}d.writeBuf.Reset()// 先把数据长度(4字节)写入buferr binary.Write(d.writeBuf, binary.BigEndian, dataLen)if err ! nil {return err}// 再把数据写入buf_, err d.writeBuf.Write(data)if err ! nil {return err}// 把buf写入注意这里其实是写入到文件的缓冲区并没有刷到磁盘中只有调用writeFile.fsync()才是真正刷到磁盘_, err d.writeFile.Write(d.writeBuf.Bytes())if err ! nil {d.writeFile.Close()d.writeFile nilreturn err}d.writePos totalBytes // 更新写入位置d.depth 1 // 更新消息数return err
}