【python小随笔】celery周期任务(简单原理)

1:目录结构

|--celery_task
    |--celery.py             # 执行任务的main函数
    |--task_one              # 第一个任务
    |--task_two              # 第2个任务
            .                    .
            .                    .
    |--task_.                # 第n个任务

2:celery.py

from celery import Celery         # 导入celery模块
from celery.schedules import crontab   # 周期定义工具包

# 配置任务
celery_task = Celery(
    "task",
    broker="redis://127.0.0.1:6379",
    backend="redis://127.0.0.1:6379",
    include=["Celery_task.task_one",]    # 任务文件夹名称.任务文件,多个往后面添加
)


# crontab(minute=‘*/720‘) # 12小时执行一次
# "schedule": 10,  # 每10秒钟执行一次

# 周期时间定义
celery_task.conf.beat_schedule = {
    "each1d_task": {
        "task": "Celery_task.task_keyword.monitored_ranking", # 要执行的函数名
        "schedule": crontab(minute=‘*/720‘),   # 12小时执行一次
        # "args": (10, 10)
    },
}

3:任务文件配置

from Celery_task.celery import celery_task     # 导入执行主函数文件

from keywords.views.celery_monitored_ranking import KeywordRun # 这里是直接导入执行的文件的执行函数

import traceback  # 日志错误详细显示模块
from logging_files import logging_main  # 导入日志对象

@celery_task.task       # 配置文件中的名称要一样
def monitored_ranking():
    try:
        KeywordRun()      # 执行函数
    except Exception as e:
        msg = traceback.format_exc()
        logging_main.monitored_ranking_error.error(msg)

相关推荐