代码之家  ›  专栏  ›  技术社区  ›  UEFI

如何运行Luigi管道的并行实例:Pid集已在运行

  •  1
  • UEFI  · 技术社区  · 8 年前

    我想用Id 2381启动它一次,然后在第一个作业运行时,我想用Id 231启动第二次运行。第一次运行按预期完成。

    Pid(s) set([10362]) already running
    Process finished with exit code 0
    

    我是这样开始跑步的

    运行一个:

    luigi.run(
        cmdline_args=["--id='newId13822'", "--TaskTwo-id=2381"],
        main_task_cls=TaskTwo()
    )
    

    运行二:

    luigi.run(
        cmdline_args=["--id='newId1322'", "--TaskTwo-id=231"],
        main_task_cls=TaskTwo()
    )
    

    每个任务都有一个由luigi的task\u ID\u str(…)方法生成的唯一ID。为什么luigi认为当luigi运行时任务已经在运行。paramater、TaskTwo id和MockTarget文件都不同?

    import time
    import uuid
    from luigi.mock import MockTarget
    import luigi
    
    
    class TaskOne(luigi.Task):
        run_id = luigi.Parameter()
    
        def output(self):
            return MockTarget("TaskOne{0}".format(self.run_id), mirror_on_stderr=True)
    
        def run(self):
            _out = self.output().open('w')
            time.sleep(10)
            _out.write(u"Hello World!\n")
            _out.close()
    
    
    class TaskTwo(luigi.Task):
        id = luigi.Parameter(default=uuid.uuid4().__str__())
    
        def output(self):
            return MockTarget("TaskTwo{0}".format(self.id), mirror_on_stderr=True)
    
        def requires(self):
            return TaskOne(self.id)
    
        def run(self):
            _out = self.output().open('w')
            time.sleep(10)
            _out.write(u"Hello World!\n")
            _out.close()
    
    1 回复  |  直到 8 年前
        1
  •  2
  •   MattMcKnight    8 年前

    看起来这可能是因为您没有连接到调度程序服务器,所以它尝试两次启动调度程序进程。你在运行luigid吗?

    luigid --background --pidfile ./luigid.pid --logdir . --state-path .
    

    然后我在同一个目录中打开了第二个终端。在第一次跑步中:

    PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13822 --TaskTwo-id 2381 --local-scheduler
    

    PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13823 --TaskTwo-id 2382 --local-scheduler