当前位置: 首页 > news >正文

中国最大的销售网站兰州网络推广优化服务

中国最大的销售网站,兰州网络推广优化服务,沈阳网站推广优化公司哪家好,网站免费做链接需求场景#xff1a; 实现一个flask服务#xff0c;通过接口控制一个定时任务任务#xff08;对酒店订房情况进行检查#xff09;的开启和停止。要求定时任务完成后#xff0c;可以通过飞书机器人推送任务完成的消息。 展现效果#xff1a; 启动定时任务 关闭定时任务…需求场景 实现一个flask服务通过接口控制一个定时任务任务对酒店订房情况进行检查的开启和停止。要求定时任务完成后可以通过飞书机器人推送任务完成的消息。 展现效果 启动定时任务 关闭定时任务 飞书推送消息 代码实现 项目结构 业务代码 定时任务。先通过schedule模块实现基础的定时任务业务代码。 # -*- coding:UTF-8 -*-ProjectName : FileName : schedule_monitor_taskDescription : Time : 2023/9/18 9:21Author : Qredsun import os import time from datetime import datetime from datetime import timedeltaimport schedule import functools from utils.webhook import send_messagedef read_check_list_from_excel(file):try:logger.info(业务代码)except Exception as e:logger.error(f程序异常{e})finally:prompt_message f**酒店订房检查已完成**\n \f检查时间{begin_time} - {end_time} \n \f检查报告已生成 {save_path}send_message(Environment.WEBHOOK_URL, Environment.WEBHOOK_SECRET, prompt_message)# todo 定时任务配置 def catch_exceptions(cancel_on_failure False):def catch_exceptions_decorator(job_func):functools.wraps(job_func)def wrapper(*args, **kwargs):try:return job_func(*args, **kwargs)except:import tracebacktraceback.format_exc()if cancel_on_failure:return schedule.CancelJobreturn wrapperreturn catch_exceptions_decorator# 异常捕获方法的使用 catch_exceptions(cancel_on_failureFalse) def task_job(file_path):# 定时任务read_check_list_from_excel(file_path)file_path r../data/订房检查任务.xlsx schedule.every().day.at(09:00).do(task_job, file_path) schedule.run_all()while True:schedule.run_pending() # 运行所有可以运行的任务time.sleep(60 * 30)飞书消息推送功能实现 # -*- coding:UTF-8 -*-ProjectName : FileName : webhookDescription : 飞书消息推送Time : 2023/9/17 13:36Author : Qredsun import base64 import hashlib import hmac import json from datetime import datetimeimport requestsfrom utils.env_manager import Environment, loggerWEBHOOK_URL 机器人推送地址 WEBHOOK_SECRET 机器人密码def gen_sign(secret, timestamp):# 拼接时间戳以及签名校验string_to_sign {}\n{}.format(timestamp, secret)# 使用 HMAC-SHA256 进行加密hmac_code hmac.new(string_to_sign.encode(utf-8), digestmodhashlib.sha256).digest()# 对结果进行 base64 编码sign base64.b64encode(hmac_code).decode(utf-8)return signdef send_message(WEBHOOK_URL, WEBHOOK_SECRET, MSG):timestamp int(datetime.now().timestamp())sign gen_sign(WEBHOOK_SECRET, timestamp)params {timestamp: timestamp,sign : sign,msg_type : interactive,card : {config : {wide_screen_mode: True},elements : [{tag : markdown,content: fat idall/at \n f{MSG}},{tag : action,actions: [{tag : button,text : {tag : plain_text,content: 跳转至订房检查},type : primary,multi_url: {url : Environment.CALL_BACK_URL,android_url: ,ios_url : ,pc_url : }}]}],header : {template: blue,title : {content: 订房检查异常提示,tag : plain_text}},card_link: {url : ,pc_url : ,android_url: ,ios_url : }},}resp requests.post(WEBHOOK_URL, jsonparams)resp.raise_for_status()result resp.json()if result.get(code) and result.get(code) ! 0:logger.error(f飞书机器人消息 {MSG} 发送失败{resp.text})else:logger.debug(f飞书机器人消息 {MSG} 发送成功)flask_apscheduler替换schedule app.schedule_job.add_job(idcheck_room, funcs_app:read_check_list_from_excel, **{args : (Environment.SRC_FILE,),# trigger: interval, # 指定 定时任务的类型# seconds: 5 # 运行的间隔时间# 每天九点开始执行trigger: cron,day : *,hour : 09,minute : 00,second : 00 }) app.schedule_job.start() # 启动任务列表flask服务中启动、停止接口实现 from flask import Flaskfrom utils.env_manager import Environment, logger from utils.schedule_monitor_task import read_check_list_from_excel from flask_apscheduler import APSchedulerapp Flask(__name__) app.schedule_job APScheduler()app.route(/, methods[get]) def hello_world():return 酒店订房检查服务!app.route(/stop, methods[get]) def stop_job():if not app.schedule_job.get_jobs():return 没有正在执行的订房检查任务else:app.schedule_job.remove_all_jobs()return 终止订房检查任务class Config(object):DEBUG True # flask 调试模式flask_apscheduler 配置SCHEDULER_API_ENABLED True # 开放APISCHEDULER_TIMEZONE Asia/Shanghai # 使用上海时间app.route(/start, methods[get]) def start_job():if app.schedule_job.get_jobs():return 订房检查任务已启动else:app.schedule_job.add_job(idcheck_room, funcs_app:read_check_list_from_excel, **{args : (Environment.SRC_FILE,)# 每天九点开始执行trigger: cron,day : *,hour : 09,minute : 00,second : 00})return 开始订房检查任务app.config.from_object(Config) app.schedule_job.init_app(app) # 把任务列表放入 flask app.schedule_job.start() # 启动任务列表 app.run() ps 机器人推送接口的配置 在flask的配置中将SCHEDULER_API_ENABLED设置为True服务启动后自动加载flask_apscheduler提供的API接口 1. /scheduler [GET] 获取服务基本信息 2. /scheduler/jobs [POST json job data] 添加新的任务 3. /scheduler/jobs/job_id [GET] 根据job_id返回任务的详细信息 4. /scheduler/jobs [GET] 返回所有任务的信息 5. /scheduler/jobs/job_id [DELETE] 删除任务 6. /scheduler/jobs/job_id [PATCH json job data] 更新一个已经存在的任务 7. /scheduler/jobs/job_id/pause [POST] 暂停一个任务并返回任务的信息 8. /scheduler/jobs/job_id/resume [POST] 重新启动一个任务并返回任务信息 9. /scheduler/jobs/job_id/run [POST] 启动一个任务并返回任务的信息 在实现定时任务的启动和关闭时并没有直接flask_apscheduler使用自带的接口而是通过flask_apscheduler提供的定时任务管理方法实现。还有下面一些方法可参考使用 scheduler.start() 开始任务scheduler.shutdown() 停止任务scheduler.pause() 暂停所有任务scheduler.resume() 开启任务scheduler.add_listener(callback function,event) 添加监听事件scheduler.remove_listener(callback function) 去除监听事件scheduler.add_job(id,function, **kwargs) 添加jobscheduler.remove_job(id, **jobstore) 删除jobscheduler.remove_all_jobs(**jobstore) 删除所有定时任务scheduler.get_job(id,**jobstore) 获取job信息scheduler.modify_job(id,**jobstore, **kwargs) 修改jobscheduler.pause_job(id, **jobstore) 暂停jobscheduler.resume_job(id, **jobstore) 恢复jobscheduler.run_job(id, **jobstore) 启动jobscheduler.authenticate(function) 验证
http://www.sadfv.cn/news/84549/

相关文章:

  • 企业网站建设需求调研表早那么做商城网站
  • 医院网站建设基本功能WordPress会员卡插件
  • 浙江网站建设公司名单网站怎么做百度商桥
  • dedecms网站开发环境wordpress免插件图床
  • 海南行指三亚网站开发wordpress 父分类名称
  • 广东住房和建设局网站专门做网站的公司 南阳
  • 网站首页收录突然没有了建设企业外贸网站
  • 建筑企业招聘网站织梦者网站模板
  • 做网站需要写代码吗在哪注册域名
  • 包头教育平台网站建设婺源做网站
  • 东莞网站制作哪里好如何给wordpress导航添加图标
  • 超市网站建设策划书潍坊建设网站的公司
  • 做网站需要icp吗中企动力企业邮箱下载
  • 长沙市网站制作起名最好的网站排名
  • 南宁网站设计公司排名做一个网站的建设流程
  • 建网站支持设备是什么意思百度网盟推广 网站
  • 网站建设方案报告一个专门做视频配音的网站
  • 上海网站建设官方网站网站开发有哪些参考文献
  • 兖州中材建设有限公司网站网页版传奇大全
  • 深圳网站设计网站制作小程序商城装修
  • 展示型手机网站网站建设行业增长率
  • 效果图网站猪八戒网站的建设思路
  • 优化网站关键词的技巧建网站的论坛
  • 威海网站建设公司排名百度网页入口官网
  • 省级住房城乡建设主管部门网站我的网站怎么不能搜索
  • 网站建设开票内容是什么意思施工企业安全生产管理规范
  • 吉安网站推广徽hyhyk1wordpress加帝国cms
  • 何炅做的网站广告百度识图查另一半情头网页版
  • 买网站做设计参考属于什么费用免费技能培训在哪里报名
  • 求网站资源懂的2021ui网页设计是什么