代码之家  ›  专栏  ›  技术社区  ›  TM. Randy Simon

Django-设置计划作业?

  •  450
  • TM. Randy Simon  · 技术社区  · 15 年前

    有人知道怎么设置吗?

    澄清一下:我知道我可以建立一个 cron

    我曾考虑过通过简单地检查自上次向站点发送请求以来是否应该运行作业来“追溯”触发这些操作,但我希望有更干净的东西。

    22 回复  |  直到 4 年前
        1
  •  391
  •   Aniruddh Agarwal    5 年前

    我采用的一种解决方案是:

    1) 创建一个 custom management command ,例如。

    python manage.py my_cool_command
    

    2) 使用 cron (在Linux上)或 at

    这是一个简单的解决方案,不需要安装沉重的AMQP堆栈。然而,在其他答案中提到,使用芹菜这样的东西有很多好处。特别是,对于芹菜,不必将应用程序逻辑分散到crontab文件中是很好的。然而,cron解决方案非常适合中小型应用程序,并且您不需要太多的外部依赖性。

    编辑:

    Windows 8、Server 2012及更高版本不推荐使用该命令。你可以用 schtasks.exe 同样的用途。

    ****更新**** 这是新的 link 用于编写自定义管理命令的django文档的

        2
  •  163
  •   WhyNotHugo    7 年前

    Celery periodic tasks ).根据你的应用程序,它可能值得一试。

    芹菜很容易和django搭配( docs

        3
  •  54
  •   leonheess    4 年前

    我们已经开源了我认为是结构化的应用程序。上面布赖恩的解决方案也暗示了这一点。我们希望得到任何/所有反馈!

    https://github.com/tivix/django-cron

    它附带一个管理命令:

    ./manage.py runcrons
    

    那就行了。每个cron都被建模为一个类(因此都是面向对象的),每个cron以不同的频率运行,我们确保相同的cron类型不会并行运行(以防cron自身运行的时间比其频率长!)

        4
  •  38
  •   xuhdev    8 年前

    如果您使用的是标准POSIX操作系统,那么您可以使用 cron .

    at .

    1. 弄清楚他们在哪个平台上。

    2. 为用户执行适当的“AT”命令, 为您的用户更新crontab。

        5
  •  23
  •   Austin Adams    7 年前

    有趣的新可插拔Django应用程序: django-chronograph

    您只需添加一个充当计时器的cron条目,就可以在脚本中运行一个非常好的Django管理界面。

        6
  •  16
  •   user41767    15 年前

    看看Django Poor Man's Cron,这是一款Django应用程序,它利用Spambot、搜索引擎索引机器人等工具,以大约固定的时间间隔运行预定任务

    见: http://code.google.com/p/django-poormanscron/

        7
  •  12
  •   PhoenixDev    4 年前

    不久前,我有完全相同的需求,并最终使用 APScheduler ( User Guide )

    from apscheduler.schedulers.background import BackgroundScheduler
    
    scheduler = BackgroundScheduler()
    job = None
    
    def tick():
        print('One tick!')\
    
    def start_job():
        global job
        job = scheduler.add_job(tick, 'interval', seconds=3600)
        try:
            scheduler.start()
        except:
            pass
    

    希望这对别人有帮助!

        8
  •  10
  •   Johannes Gorset    12 年前

    Brian Neal关于通过cron运行管理命令的建议效果很好,但是如果你想找一个更健壮(但没有芹菜那么精致)的东西,我会找一个类似的库 Kronos :

    # app/cron.py
    
    import kronos
    
    @kronos.register('0 * * * *')
    def task():
        pass
    
        9
  •  9
  •   sleblanc    9 年前

    芹菜; AMQP 将允许您处理中断的任务,并且它将由另一个工作人员再次执行(芹菜工作人员侦听下一个要处理的任务),直到任务完成 max_retries 已经到达。

        10
  •  9
  •   Jean-François Fabre Darshan Ambre    5 年前

    用于调度程序作业的Django APScheduler。Advanced Python Scheduler(APScheduler)是一个Python库,允许您计划稍后执行的Python代码,可以是一次执行,也可以是定期执行。您可以随时添加新工作或删除旧工作。

    注:我是这个图书馆的作者

    安装APScheduler

    pip install apscheduler
    

    查看要调用的文件函数

    文件名:scheduler_jobs.py

    def FirstCronTest():
        print("")
        print("I am executed..!")
    

    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    

    您在这里编写的函数,调度程序函数是在调度程序作业中编写的

    import scheduler_jobs 
    
    scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
    scheduler.start()
    

    链接要执行的文件

    import execute
    
        11
  •  8
  •   chhantyal    11 年前

    我个人使用cron,但是 Jobs Scheduling 部分 django-extensions 看起来很有趣。

        12
  •  7
  •   Alexander    8 年前

    虽然不是Django的一部分, Airflow 是一个对任务管理有用的较新项目(截至2016年)。

    Airflow是一个工作流自动化和调度系统,可用于编写和管理数据管道。基于web的UI为开发人员提供了一系列用于管理和查看这些管道的选项。

    气流是用Python编写的,并使用Flask构建。

    气流由Airbnb的Maxime Beauchemin创建,并于2015年春季开源。2016年冬天,它加入了Apache软件基金会孵化计划。这是你的电话号码 Git project page 还有一些补充 background information .

        13
  •  6
  •   Matt McCormick    14 年前

    #!/usr/bin/python
    import os, sys
    sys.path.append('/path/to/') # the parent directory of the project
    sys.path.append('/path/to/project') # these lines only needed if not on path
    os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'
    
    # imports and code below
    
        14
  •  6
  •   Michael    12 年前

    我只是想到了一个相当简单的解决方案:

    1. 定义一个视图函数 与任何其他视图一样,使用URL映射,返回HttpResponse等等。
    2. 使用运行的计时首选项(或使用AT或Windows中的计划任务)设置cron作业 http://localhost/your/mapped/url?param=value .

    您可以添加参数,但只需将参数添加到URL即可。

    告诉我你们的想法。

    我现在使用的是来自的runjob命令 django-extensions 而不是卷曲。

    我的cron看起来像这样:

    @hourly python /path/to/project/manage.py runjobs hourly
    

    ... 每天、每月等等。您还可以将其设置为运行特定作业。

        15
  •  4
  •   xiaohei    13 年前

    完成部分代码后,我可以编写任何类似于我的视图的内容。py:)

    #######################################
    import os,sys
    sys.path.append('/home/administrator/development/store')
    os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
    from django.core.management impor setup_environ
    from store import settings
    setup_environ(settings)
    #######################################
    

    从…起 http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/

        16
  •  4
  •   saran3h    6 年前

    你绝对应该看看django-q!

    它正在积极开发,并与django、django ORM、mongo、redis进行了很好的集成。以下是我的配置:

    # django-q
    # -------------------------------------------------------------------------
    # See: http://django-q.readthedocs.io/en/latest/configure.html
    Q_CLUSTER = {
        # Match recommended settings from docs.
        'name': 'DjangoORM',
        'workers': 4,
        'queue_limit': 50,
        'bulk': 10,
        'orm': 'default',
    
    # Custom Settings
    # ---------------
    # Limit the amount of successful tasks saved to Django.
    'save_limit': 10000,
    
    # See https://github.com/Koed00/django-q/issues/110.
    'catch_up': False,
    
    # Number of seconds a worker can spend on a task before it's terminated.
    'timeout': 60 * 5,
    
    # Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
    # longer than `timeout`, otherwise the same task will be processed multiple times.
    'retry': 60 * 6,
    
    # Whether to force all async() calls to be run with sync=True (making them synchronous).
    'sync': False,
    
    # Redirect worker exceptions directly to Sentry error reporter.
    'error_reporter': {
        'sentry': RAVEN_CONFIG,
    },
    }
    
        17
  •  3
  •   devdrc    7 年前

    https://django-q.readthedocs.io/en/latest/index.html

    它有很好的文档,很容易浏览。缺少Windows支持,因为Windows不支持进程分叉。但是,如果您使用Windows for Linux子系统创建开发环境,它就可以正常工作。

        18
  •  2
  •   Fabricio Buzeto abhi    13 年前

    我今天和你的问题有点类似。

    因此,我创建了一个调度模块并将其附加到 .

    这不是最好的方法,但它帮助我将所有代码放在一个地方,并使其执行与主应用程序相关。

        19
  •  2
  •   Ni Xiaoni    10 年前

        from threading import Timer
    
        def sync():
    
            do something...
    
            sync_timer = Timer(self.interval, sync, ())
            sync_timer.start()
    

    就像 递归的

    好的,我希望这种方法能满足您的要求。:)

        20
  •  1
  •   Peter Brittain    9 年前

    我用芹菜做我的定期任务。首先,您需要按如下方式安装它:

    pip install django-celery
    

    别忘了登记 django-celery 在您的设置中,然后您可以执行以下操作:

    from celery import task
    from celery.decorators import periodic_task
    from celery.task.schedules import crontab
    from celery.utils.log import get_task_logger
    @periodic_task(run_every=crontab(minute="0", hour="23"))
    def do_every_midnight():
     #your code
    
        21
  •  1
  •   just10minutes    7 年前
        22
  •  1
  •   Tanveer    3 年前

    另一个选择是 simple-scheduler here .

    烧瓶示例:

    if __name__ == "__main__":
        try:
            scheduled_jobs.run()
            application.run(host='0.0.0.0', port=5000)
        except Exception as e:
            # log them
    
        23
  •  0
  •   Sri    6 年前

    可信赖的 尝试 它建在 AWS SQS/SNS .

    参考: http://taskhawk.readthedocs.io

        24
  •  0
  •   yspreen    5 年前

    对于简单的停靠项目,我真的看不到任何现有的答案。

    因此,我编写了一个非常简单的解决方案,不需要外部库或触发器,它可以自己运行。无需外部操作系统cron,应可在所有环境中工作。

    它通过添加一个中间件来工作: middleware.py

    import threading
    
    def should_run(name, seconds_interval):
        from application.models import CronJob
        from django.utils.timezone import now
    
        try:
            c = CronJob.objects.get(name=name)
        except CronJob.DoesNotExist:
            CronJob(name=name, last_ran=now()).save()
            return True
    
        if (now() - c.last_ran).total_seconds() >= seconds_interval:
            c.last_ran = now()
            c.save()
            return True
    
        return False
    
    
    class CronTask:
        def __init__(self, name, seconds_interval, function):
            self.name = name
            self.seconds_interval = seconds_interval
            self.function = function
    
    
    def cron_worker(*_):
        if not should_run("main", 60):
            return
    
        # customize this part:
        from application.models import Event
        tasks = [
            CronTask("events", 60 * 30, Event.clean_stale_objects),
            # ...
        ]
    
        for task in tasks:
            if should_run(task.name, task.seconds_interval):
                task.function()
    
    
    def cron_middleware(get_response):
    
        def middleware(request):
            response = get_response(request)
            threading.Thread(target=cron_worker).start()
            return response
    
        return middleware
    

    models/cron.py :

    from django.db import models
    
    
    class CronJob(models.Model):
        name = models.CharField(max_length=10, primary_key=True)
        last_ran = models.DateTimeField()
    

    settings.py :

    MIDDLEWARE = [
        ...
        'application.middleware.cron_middleware',
        ...
    ]
    
        25
  •  0
  •   Hamfri    5 年前

    简单的方法是编写自定义shell命令,请参见 Django Documentation Tutorial