代码之家  ›  专栏  ›  技术社区  ›  NeoVe

在Django_cron上运行特定的views.py方法

  •  0
  • NeoVe  · 技术社区  · 5 年前

    我有这个方法 views.py 文件:

    def getHistoricRates():
        """ Here we have the function that will retrieve the historical rates from fixer.io, since last month """
        rates = {}
        response = urlopen('http://data.fixer.io/api/2018-12-31?access_key=c2f5070ad78b0748111281f6475c0bdd')
        data = response.read()
        rdata = json.loads(data.decode(), parse_float=float) 
        rates_from_rdata = rdata.get('rates', {})
        for rate_symbol in ['USD', 'GBP', 'HKD', 'AUD', 'JPY', 'SEK', 'NOK', 'EUR']:
            try:
                rates[rate_symbol] = rates_from_rdata[rate_symbol]
            except KeyError:
                logging.warning('rate for {} not found in rdata'.format(rate_symbol)) 
                pass
    
        return rates
    
    @require_http_methods(['GET', 'POST'])
    def historical(request):
        date_str = "2018-12-31"
        if datetime.datetime.strptime(date_str,"%Y-%m-%d").weekday()<5:
            rates = getHistoricRates()
            fixerio_rates = [Fixerio_rates(currency=currency, rate=rate)
                         for currency, rate in rates.items()]
            Fixerio_rates.objects.bulk_create(fixerio_rates) 
            return render(request, 'historical.html') 
    

    我想跑 historical 每天上午9点,周末除外。

    现在,我找不到任何关于如何运行现有方法或如何从 cron.py 任何文件。

    我已经把所有的东西都配置好了 django_cron 但我无法从cron文件中找出如何“使用”这个方法在特定时间运行它。

    这是我的 克朗 文件到目前为止:

    from django_cron import CronJobBase, Schedule
    from .views import historical
    
    class MyCronJob(CronJobBase):
        RUN_AT_TIMES = ['9:00']
    
        schedule = Schedule(run_at_times=RUN_AT_TIMES)
        code = 'fixerio.my_cron_job'    # a unique code
    
        def do(self):
            pass    # do your thing here
    

    名字 fixerio 是我的应用程序名。

    对此有什么想法吗?

    1 回复  |  直到 5 年前
        1
  •  1
  •   Will Keeling    5 年前

    因为你想打电话给 getHistoricRates() 以及 bulk_create() 你的逻辑 historical() 视图和cron作业,最好先将视图中的公共代码重构为单独的模块,例如 helpers.py 那就住在 views.py cron.py .

    Help.Py

    from .models import Fixerio_rates
    
    def create_rates():
        rates = getHistoricRates()
        fixerio_rates = [Fixerio_rates(currency=currency, rate=rate)
                         for currency, rate in rates.items()]
        Fixerio_rates.objects.bulk_create(fixerio_rates) 
    
    def getHistoricRates():
        ...
    

    然后您可以从cron作业调用它:

    克朗

    from .helpers import create_rates
    
    class MyCronJob(CronJobBase):
        RUN_AT_TIMES = ['9:00']
    
        schedule = Schedule(run_at_times=RUN_AT_TIMES)
        code = 'fixerio.my_cron_job'    # a unique code
    
        def do(self):
            create_rates()
    

    从你的角度来看:

    VIEW

    from .helpers import create_rates
    
    @require_http_methods(['GET', 'POST'])
    def historical(request):
        date_str = "2018-12-31"
        if datetime.datetime.strptime(date_str,"%Y-%m-%d").weekday()<5:
            create_rates()
            return render(request, 'historical.html') 
    
    推荐文章