广州网站排名优化价格,西安有啥好玩的地方,杭州网站制作工具,wordpress主题安装后找不到这是我的第 193 期分享作者 | 程序员内点事来源 | 程序员内点事#xff08;ID#xff1a;chegnxy-nds#xff09; 分享 | Java中文社群#xff08;ID#xff1a;javacn666#xff09;五一期间原计划是写两篇文章#xff0c;看一本技术类书籍#xff0c;结果这五天由于自… 这是我的第 193 期分享作者 | 程序员内点事来源 | 程序员内点事IDchegnxy-nds 分享 | Java中文社群IDjavacn666五一期间原计划是写两篇文章看一本技术类书籍结果这五天由于自律性过于差禁不住各种诱惑我连电脑都没打开过计划完美宣告失败。所以在这能看出和大佬之间的差距人家没白没夜的更文比你优秀的人比你更努力难以望其项背真是让我自愧不如。知耻而后勇这不逼着自己又学起来了个人比较喜欢一些实践类的东西既学习到知识又能让技术落地能搞出个demo最好本来不知道该分享什么主题好在最近项目紧急招人中而我有幸做了回面试官就给大家整理分享一道面试题“如何实现延时队列”。下边会介绍多种实现延时队列的思路文末提供有几种实现方式的 github地址。其实哪种方式都没有绝对的好与坏只是看把它用在什么业务场景中技术这东西没有最好的只有最合适的。一、延时队列的应用什么是延时队列顾名思义首先它要具有队列的特性再给它附加一个延迟消费队列消息的功能也就是说可以指定队列中的消息在哪个时间点被消费。延时队列在项目中的应用还是比较多的尤其像电商类平台1、订单成功后在30分钟内没有支付自动取消订单2、外卖平台发送订餐通知下单成功后60s给用户推送短信。3、如果订单一直处于某一个未完结状态时及时处理关单并退还库存4、淘宝新建商户一个月内还没上传商品信息将冻结商铺等。。。。上边的这些场景都可以应用延时队列解决。二、延时队列的实现我个人一直秉承的观点工作上能用JDK自带API实现的功能就不要轻易自己重复造轮子或者引入三方中间件。一方面自己封装很容易出问题大佬除外再加上调试验证产生许多不必要的工作量另一方面一旦接入三方的中间件就会让系统复杂度成倍的增加维护成本也大大的增加。1、DelayQueue 延时队列JDK 中提供了一组实现延迟队列的API位于Java.util.concurrent包下DelayQueue。DelayQueue是一个BlockingQueue无界阻塞队列它本质就是封装了一个PriorityQueue优先队列PriorityQueue内部使用完全二叉堆不知道的自行了解哈来实现队列元素排序我们在向DelayQueue队列中添加元素时会给元素一个Delay延迟时间作为排序条件队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类在存放基本数据类型时优先队列中元素默认升序排列自定义实体类就需要我们根据类属性值比较计算了。先简单实现一下看看效果添加三个order入队DelayQueue分别设置订单在当前时间的5秒、10秒、15秒后取消。要实现DelayQueue延时队列队中元素要implements Delayed 接口这哥接口里只有一个getDelay方法用于设置延期时间。Order类中compareTo方法负责对队列中的元素进行排序。public class Order implements Delayed {/*** 延迟时间*/JsonFormat(locale zh, timezone GMT8, pattern yyyy-MM-dd HH:mm:ss)private long time;String name;public Order(String name, long time, TimeUnit unit) {this.name name;this.time System.currentTimeMillis() (time 0 ? unit.toMillis(time) : 0);}Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}Overridepublic int compareTo(Delayed o) {Order Order (Order) o;long diff this.time - Order.time;if (diff 0) {return -1;} else {return 1;}}
}
DelayQueue的put方法是线程安全的因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法 poll() 和 take() poll() 为非阻塞获取没有到期的元素直接返回nulltake() 阻塞方式获取没有到期的元素线程将会等待。public class DelayQueueDemo {public static void main(String[] args) throws InterruptedException {Order Order1 new Order(Order1, 5, TimeUnit.SECONDS);Order Order2 new Order(Order2, 10, TimeUnit.SECONDS);Order Order3 new Order(Order3, 15, TimeUnit.SECONDS);DelayQueueOrder delayQueue new DelayQueue();delayQueue.put(Order1);delayQueue.put(Order2);delayQueue.put(Order3);System.out.println(订单延迟队列开始时间: LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)));while (delayQueue.size() ! 0) {/*** 取队列头部元素是否过期*/Order task delayQueue.poll();if (task ! null) {System.out.format(订单:{%s}被取消, 取消时间:{%s}\n, task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss)));}Thread.sleep(1000);}}
}
上边只是简单的实现入队与出队的操作实际开发中会有专门的线程负责消息的入队与消费。执行后看到结果如下Order1、Order2、Order3 分别在 5秒、10秒、15秒后被执行至此就用DelayQueue实现了延时队列。订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}
订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}
订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}
2、Quartz 定时任务Quartz一款非常经典任务调度框架在Redis、RabbitMQ还未广泛应用时超时未支付取消订单功能都是由定时任务实现的。定时任务它有一定的周期性可能很多单子已经超时但还没到达触发执行的时间点那么就会造成订单处理的不够及时。引入quartz框架依赖包dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-quartz/artifactId
/dependency
在启动类中使用EnableScheduling注解开启定时任务功能。EnableScheduling
SpringBootApplication
public class DelayqueueApplication {public static void main(String[] args) {SpringApplication.run(DelayqueueApplication.class, args);}
}
编写一个定时任务每个5秒执行一次。Component
public class QuartzDemo {//每隔五秒Scheduled(cron 0/5 * * * * ? )public void process(){System.out.println(我是定时任务);}
}
3、Redis sorted setRedis的数据结构Zset同样可以实现延迟队列的效果主要利用它的score属性redis通过score来为集合中的成员进行从小到大的排序。通过zadd命令向队列delayqueue 中添加元素并设置score值表示元素过期的时间向delayqueue 添加三个order1、order2、order3分别是10秒、20秒、30秒后过期。 zadd delayqueue 3 order3
消费端轮询队列delayqueue 将元素排序后取最小时间与当前时间比对如小于当前时间代表已经过期移除key。 /*** 消费消息*/public void pollOrderQueue() {while (true) {SetTuple set jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);String value ((Tuple) set.toArray()[0]).getElement();int score (int) ((Tuple) set.toArray()[0]).getScore();Calendar cal Calendar.getInstance();int nowSecond (int) (cal.getTimeInMillis() / 1000);if (nowSecond score) {jedis.zrem(DELAY_QUEUE, value);System.out.println(sdf.format(new Date()) removed key: value);}if (jedis.zcard(DELAY_QUEUE) 0) {System.out.println(sdf.format(new Date()) zset empty );return;}Thread.sleep(1000);}}
我们看到执行结果符合预期2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty
4、Redis 过期回调Redis 的key过期回调事件也能达到延迟队列的效果简单来说我们开启监听key是否过期的事件一旦key过期会触发一个callback事件。修改redis.conf文件开启notify-keyspace-events Exnotify-keyspace-events Ex
Redis监听配置注入Bean RedisMessageListenerContainerConfiguration
public class RedisListenerConfig {BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}
编写Redis过期回调监听方法必须继承KeyExpirationEventMessageListener 有点类似于MQ的消息监听。Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}Overridepublic void onMessage(Message message, byte[] pattern) {String expiredKey message.toString();System.out.println(监听到key expiredKey 已过期);}
}
到这代码就编写完成非常的简单接下来测试一下效果在redis-cli客户端添加一个key 并给定3s的过期时间。 set xiaofu 123 ex 3
在控制台成功监听到了这个过期的key。监听到过期的key为xiaofu
5、RabbitMQ 延时队列利用 RabbitMQ 做延时队列是比较常见的一种方式而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。先来认识一下 TTL和 DXL两个概念Time To Live(TTL) TTL 顾名思义指的是消息的存活时间RabbitMQ可以通过x-message-tt参数来设置指定Queue队列和 Message消息上消息的存活时间它的值是一个非负整数单位为微秒。RabbitMQ 可以从两种维度设置消息过期时间分别是队列和消息本身设置队列过期时间那么队列中所有消息都具有相同的过期时间。设置消息过期时间对队列中的某一条消息设置过期时间每条消息TTL都可以不同。如果同时设置队列和队列中消息的TTL则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间一旦超过TTL过期时间则成为Dead Letter死信。Dead Letter ExchangesDLXDLX即死信交换机绑定在死信交换机上的即死信队列。RabbitMQ的 Queue队列可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key可选一旦队列内出现了Dead Letter死信则按照这两个参数可以将消息重新路由到另一个Exchange交换机让消息重新被消费。x-dead-letter-exchange队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange交换机。x-dead-letter-routing-key指定routing-key发送一般为要指定转发的队列。队列出现Dead Letter的情况有消息或者队列的TTL过期队列达到最大长度消息被消费端拒绝basic.reject or basic.nack下边结合一张图看看如何实现超30分钟未支付关单功能我们将订单消息A0001发送到延迟队列order.delay.queue并设置x-message-tt消息存活时间为30分钟当到达30分钟后订单消息A0001成为了Dead Letter死信延迟队列检测到有死信通过配置x-dead-letter-exchange将死信重新转发到能正常消费的关单队列直接监听关单队列处理关单逻辑即可。发送消息时指定消息延迟的时间public void send(String delayTimes) {amqpTemplate.convertAndSend(order.pay.exchange, order.pay.queue,大家好我是延迟数据, message - {// 设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;});}
}
设置延迟队列出现死信后的转发规则/*** 延时队列*/Bean(name order.delay.queue)public Queue getMessageQueue() {return QueueBuilder.durable(RabbitConstant.DEAD_LETTER_QUEUE)// 配置到期后转发的交换.withArgument(x-dead-letter-exchange, order.close.exchange)// 配置到期后转发的路由键.withArgument(x-dead-letter-routing-key, order.close.queue).build();}
6、时间轮前边几种延时队列的实现方法相对简单比较容易理解时间轮算法就稍微有点抽象了。kafka、netty都有基于时间轮算法实现延时队列下边主要实践Netty的延时队列讲一下时间轮是什么原理。先来看一张时间轮的原理图解读一下时间轮的几个基本概念wheel 时间轮图中的圆盘可以看作是钟表的刻度。比如一圈round 长度为24秒刻度数为 8那么每一个刻度表示 3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大精度越大。当添加一个定时、延时任务A假如会延迟25秒后才会执行可时间轮一圈round 的长度才24秒那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index也是就任务A会绕一圈指向0格子上此时时间轮会记录该任务的round和 index信息。当round0index0 指针指向0格子 任务A并不会执行因为 round0不满足要求。所以每一个格子代表的是一些时间比如1秒和25秒 都会指向0格子上而任务则放在每个格子对应的链表中这点和HashMap的数据有些类似。Netty构建延时队列主要用HashedWheelTimerHashedWheelTimer底层数据结构依然是使用DelayedQueue只是采用时间轮的算法来实现。下面我们用Netty 简单实现延时队列HashedWheelTimer构造函数比较多解释一下各参数的含义。ThreadFactory 表示用于生成工作线程一般采用线程池tickDuration和unit每格的时间间隔默认100msticksPerWheel一圈下来有几格默认512而如果传入数值的不是2的N次方则会调整为大于等于该参数的一个2的N次方数值有利于优化hash值的计算。public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);}
TimerTask一个定时任务的实现接口其中run方法包装了定时任务的逻辑。Timeout一个定时任务提交到Timer之后返回的句柄通过这个句柄外部可以取消这个定时任务并对定时任务的状态进行一些基本的判断。Timer是HashedWheelTimer实现的父接口仅定义了如何提交定时任务和如何停止整个定时机制。public class NettyDelayQueue {public static void main(String[] args) {final Timer timer new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);//定时任务TimerTask task1 new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println(order1 5s 后执行 );timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册}};timer.newTimeout(task1, 5, TimeUnit.SECONDS);TimerTask task2 new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println(order2 10s 后执行);timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册}};timer.newTimeout(task2, 10, TimeUnit.SECONDS);//延迟任务timer.newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {System.out.println(order3 15s 后执行一次);}}, 15, TimeUnit.SECONDS);}
}
从执行的结果看order3、order3延时任务只执行了一次而order2、order1为定时任务按照不同的周期重复执行。order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行
总结为了让大家更容易理解上边的代码写的都比较简单粗糙几种实现方式的demo已经都提交到github 地址https://github.com/chengxy-nds/delayqueue感兴趣的小伙伴可以下载跑一跑。这篇文章肝了挺长时间写作一点也不比上班干活轻松查证资料反复验证demo的可行性搭建各种RabbitMQ、Redis环境只想说我太难了可能写的有不够完善的地方如哪里有错误或者不明了的欢迎大家踊跃指正最后原创不易码字不易点个再看吧~史上最全的延迟任务实现方式汇总附代码强烈推荐《大厂内部资料》Redis 性能优化的 13 条军规全网首发关注公众号发送”进群“老王拉你进读者群。