深圳市建设工程监理协会网站,龙岩58同城,网站模板套用湖南岚鸿,可以免费秒玩游戏的网站记录多实例服务定时任务出现运行多次的问题
问题#xff1a;web项目运行多个实例时#xff0c;定时任务会被执行多次的问题
举例来说 我使用库APScheduler排定了一个定时任务taskA在每天的晚上9点需要执行一次#xff0c;我的web服务使用分布式运行了8个实例#xff0c;于…记录多实例服务定时任务出现运行多次的问题
问题web项目运行多个实例时定时任务会被执行多次的问题
举例来说 我使用库APScheduler排定了一个定时任务taskA在每天的晚上9点需要执行一次我的web服务使用分布式运行了8个实例于是在每天晚上的9点我的8个服务都会接收到APScheduler发布的这个定时任务然后每个服务都会执行一次这个定时任务这与我预期的只执行1次是不符的我想要解决这个问题让定时任务只被执行一次
设想的解决方式 使用redis分布式锁的方式redis.setnx(key, value)让实例只能运行一次定时任务从而解决定时任务会被执行多次的问题
setnx(key, value)方法很简单就是当key不存在时setnx会成功否则会失败 def setnx(self, name: KeyT, value: EncodableT) - Awaitable:Set the value of key name to value if key doesnt existreturn self.execute_command(SETNX, name, value)实现方案 setnx(key, value)方法会在key不存在时设置value当多个线程同时接到排期准备运行同一个任务时只有第一个线程setnx会成功返回True于是第一个setnx成功的线程运行了定时任务其他线程在setnx时由于key已经存在会失败返回False从而让它们跳过定时任务的执行 仍然存在的问题定时任务一般会执行多次在其下一次执行时setnx相同key的这条记录应该被删除掉因为这是一次新的任务否则之后的任务执行都会因setnx时key已存在而失败导致任务无法执行 i. 第一种方案在setnx成功的线程1任务执行完成后删除这个key在redis中存储的记录从而让下一次任务第一次运行时又可以成功setnx(key, value)而执行 但这种方案存在一定的风险如果存在线程2因为一些原因阻塞了在线程1执行完任务才开始接收到运行定时任务的指令那么线程2会在key被删除后开始尝试setnx那必然会成功然后重复了运行任务 ii. 基于第一种方案的考虑确定了第二种方案只需要给每次的定时任务添加唯一标识即可避免第一种方案的问题设置此次任务运行的唯一key_x在setnx成功的线程1任务执行完成之后不对这次定时任务的key_x执行删除 此次定时任务唯一key_x的设置很容易想到的方案是在这次定时任务id上添加运行的排期时间这样就可以让这一次的定时任务是唯一且可识别了只要运行了一次其值就永久设置为True不会在执行第二次考虑到资源占用实际应该设置一个较长的过期时间也完全可以避免方案1的风险
设置有过期时间的方法应该使用redis.set(key, value, nxTrue, ex10)方法这里nxTrue表名使用命令SETNX而ex10则是过期的时间单位为秒
第一种方案的可重复运行的小案例
# -*- coding: utf-8 -*-
import asyncio
import timeimport aioredis
from aioredis import Redisloop asyncio.get_event_loop()
redis_coro aioredis.Redis()def redis_distributed_lock(cache_key, cache_valuelocked):def decorator(func):async def wrapper(*args, **kwargs):redis_instance await redis_coro# 这里设置了10小时的过期时间完全可以避免重复运行的风险了locked await redis_instance.set(cache_key, cache_value, nxTrue, ex60 * 60 * 10)if locked: # 第一个线程设置成功值会运行任务否则不会运行任务print(success)return await func(*args, **kwargs)print(ffailed)return wrapperreturn decoratorasync def ntasks():t_time [9点, 10点, 11点] # 模拟任务在三个时间点被执行redis: Redis await redis_corot_id 1async def task_func(tid):print(f{tid}, executing...)return tid# 为了可重复运行这个示例先执行删除之前设置的keyret await redis.delete(str(t_id))for t_t in t_time:redis_key f{t_id} # 这里本来预期是直接放到函数头上装饰但是不方便控制redis_key参数所以使用了原始的方式装饰task_functask_f redis_distributed_lock(redis_key)(task_func)# 假设启动分布式服务8个会执行8次定时任务这里创建了8个任务按照先执行完先返回的顺序处理for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], looploop):res await f# print(f{res})print(f * 80)time.sleep(5) # 模拟一个定时任务在多个时间点执行下一次执行时时间参数t_t会发生变化# breakif __name__ __main__:try:loop.run_until_complete(ntasks())finally:loop.stop()loop.close()运行结果
success
tid1, executing...
failed
failed
failed
failed
failed
failed
failedfailed
failed
failed
failed
failed
failed
failed
failedfailed
failed
failed
failed
failed
failed只有第一个时间点是按照预期执行之后的时间点执行都总是失败因为每个时间点的该任务设置的key都是一样的
第二种方案的可重复运行的小案例
# -*- coding: utf-8 -*-
import asyncio
import timeimport aioredis
from aioredis import Redisloop asyncio.get_event_loop()
redis_coro aioredis.Redis()def redis_distributed_lock(cache_key, cache_valuelocked):def decorator(func):async def wrapper(*args, **kwargs):redis_instance await redis_coro# 这里设置了10小时的过期时间完全可以避免重复运行的风险了locked await redis_instance.set(cache_key, cache_value, nxTrue, ex60 * 60 * 10)if locked: # 第一个线程设置成功值会运行任务否则不会运行任务print(success)return await func(*args, **kwargs)print(ffailed)return wrapperreturn decoratorasync def ntasks():t_time [9点, 10点, 11点] # 模拟任务在三个时间点被执行redis: Redis await redis_corot_id 1async def task_func(tid):print(f{tid}, executing...)return tidfor t_t in t_time:redis_key f{t_id}_{t_t} # set的key用定时任务的id时间点来作为此次定时任务的唯一标识# 为了可重复运行这个示例先执行删除之前设置的keyret await redis.delete(redis_key)print(fkey deleted? {ret})# 这里本来预期是直接放到函数头上装饰但是不方便控制redis_key参数所以使用了原始的方式装饰task_functask_f redis_distributed_lock(redis_key)(task_func)# 假设启动分布式服务8个会执行8次定时任务这里创建了8个任务按照先执行完先返回的顺序处理for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], looploop):res await f# print(f{res})print(f * 80)time.sleep(5) # 模拟一个定时任务在多个时间点执行下一次执行时时间参数t_t会发生变化# breakif __name__ __main__:try:loop.run_until_complete(ntasks())finally:loop.stop()loop.close()
输出
key deleted? 1
success
tid1_9点, executing...
failed
failed
failed
failed
failed
failed
failedkey deleted? 1
success
tid1_10点, executing...
failed
failed
failed
failed
failed
failed
failedkey deleted? 1
success
tid1_11点, executing...
failed
failed
failed
failed
failed
failed
failed可以看到task_func任务的每个时间点的执行都只有一次成功而且不会出现只有第一个时间点执行成功而之后的时间点执行都全是失败的情况
有用的参考 起初总是尝试在协程方法中使用多线程threading.Thread老是碰壁看了这个回答后读了这篇文章感觉豁然开朗