代码之家  ›  专栏  ›  技术社区  ›  Bob

python-如何实现可等待的C函数(coroutine)

  •  12
  • Bob  · 技术社区  · 6 年前

    环境:协同操作系统(RTOS)是C和微星虚拟机的任务之一。

    为了使虚拟机不阻塞其他RTOS任务,我插入 RTOS_sleep() 在里面 vm.c:DISPATCH() 因此,在执行每个字节码之后,VM将控制权交给下一个RTOS任务。

    我创建了一个UPY接口,使用生产者-消费者设计模式从物理数据总线(可能是CAN、SPI、以太网)异步获取数据。

    在UPY中的用法:

    can_q = CANbus.queue()
    message = can_q.get()
    

    C中的实现是这样的 can_q.get() 不阻塞RTOS:它轮询C队列,如果未收到消息,则调用 RTOS睡眠( 让另一个任务有机会填满队列。因为C队列只由另一个RTOS任务更新,而RTOS任务仅在 RTOS睡眠( 合作的

    C-实施基本上是:

    // gives chance for c-queue to be filled by other RTOS task
    while(c_queue_empty() == true) RTOS_sleep(); 
    return c_queue_get_message();
    

    尽管python语句 可以得到 不会阻塞RTOS,它会阻塞UPY脚本。 我想重写一下,这样我可以用它 async def 连体衣 并且不要阻止upy脚本。

    不确定 the syntax 但是像这样:

    can_q = CANbus.queue()
    message = await can_q.get()
    

    问题

    如何编写C函数以便 await 在上面?

    我更喜欢cpython和micropyhon的答案,但我只接受cpython的答案。

    1 回复  |  直到 5 年前
        1
  •  11
  •   user4815162342    6 年前

    注意:这个答案包括cpython和asyncio框架。然而,这些概念应该适用于其他Python实现以及其他异步框架。

    如何编写C函数以便 await 在上面?

    编写可以等待结果的C函数的最简单方法是让它返回一个已经生成的可等待对象,例如 asyncio.Future .在返回之前 Future 代码必须安排将来的结果由某个异步机制设置。所有这些基于协程的方法都假定您的程序正在某个知道如何调度协程的事件循环下运行。

    但是返回一个未来并不总是足够的——也许我们想定义一个具有任意数量悬挂点的对象。返回一个未来只会暂停一次(如果返回的未来不完整),在未来完成后恢复,就是这样。相当于 async def 包含多个 等待 不能通过返回未来来实现,它必须实现协同程序通常实现的协议。这有点像一个迭代器实现一个自定义 __next__ 而不是发电机。

    定义可等待的自定义

    为了定义我们自己的等待类型,我们可以求助于政治公众人物492, specifies 确切地说,哪些对象可以传递给 等待 .不是用定义的python函数 异步定义 ,用户定义的类型可以通过定义 __await__ python/c映射到 tp_as_async.am_await 部分 PyTypeObject 结构。

    这意味着在python/c中,必须执行以下操作:

    • 为指定一个非空值 tp_as_async 扩展类型的字段。
    • 拥有它 am_await 成员指向接受类型的实例并返回实现 iterator protocol ,即定义 tp_iter (一般定义为 PyIter_Self )和 tp_iternext .
    • 迭代器 下一步 必须推进协同程序的状态机。每个非特殊回报 下一步 相当于暂停,最后 StopIteration 异常表示coroutine的最终返回。返回值存储在 value 的属性 停止迭代 .

    为了使协同程序有用,它还必须能够与驱动它的事件循环通信,以便它可以指定在挂起后何时恢复。Asyncio定义的大多数协程都期望在Asyncio事件循环下运行,并在内部使用 asyncio.get_event_loop() (和/或接受明确的 loop 参数)获取其服务。

    协程示例

    为了说明python/c代码需要实现什么,让我们考虑用python表示的简单协程。 异步定义 例如,这相当于 asyncio.sleep() 以下内容:

    async def my_sleep(n):
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        loop.call_later(n, future.set_result, None)
        await future
        # we get back here after the timeout has elapsed, and
        # immediately return
    

    my_sleep 创建一个 Future ,安排它在 N号 秒,并将自身挂起,直到将来完成。最后一部分使用 等待 ,在哪里 await x 表示“允许” x 决定我们现在是暂停还是继续执行”。不完整的未来总是决定挂起,而异步 Task 协程驱动特殊案例产生了无限期中止它们的期货,并将它们的完成与恢复任务联系起来。其他事件循环(curio等)的暂停机制在细节上可能有所不同,但其基本思想是相同的: 等待 是可选的暂停执行。

    __await__() 返回一个生成器

    要把这个翻译成C,我们必须去掉魔法 异步定义 函数定义,以及 等待 悬挂点。移除 异步定义 相当简单:等价的普通函数只需要返回一个实现 _等待__ 以下内容:

    def my_sleep(n):
        return _MySleep(n)
    
    class _MySleep:
        def __init__(self, n):
            self.n = n
    
        def __await__(self):
            return _MySleepIter(self.n)
    

    这个 _等待__ 方法 _MySleep 对象返回者 my_sleep() 将由自动调用 等待 要转换的运算符 等待 对象(传递给 等待 )到迭代器。这个迭代器将用于询问等待的对象它是选择挂起还是提供一个值。这很像 for o in x 语句调用 x.__iter__() 转换 可迭代的 X 混凝土 迭代器 .

    当返回的迭代器选择挂起时,它只需要生成一个值。值的含义(如果有)将由协同程序驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从返回时 等待 ,它需要停止迭代。使用生成器作为方便的迭代器实现, _MySleepIter 如下所示:

    def _MySleepIter(n):
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        loop.call_later(n, future.set_result, None)
        # yield from future.__await__()
        for x in future.__await__():
            yield x
    

    作为 等待X 映射到 yield from x.__await__() ,我们的生成器必须耗尽返回的迭代器 future.__await__() .由返回的迭代器 Future.__await__ 如果未来是不完整的,将会产生收益,并返回未来的结果(我们在这里忽略了这一点,但是 yield from 实际上提供)否则。

    _等待 返回自定义迭代器

    C实现的最后一个障碍 我的睡眠 在C中是发电机的使用 _我的爱人 .幸运的是,任何生成器都可以转换为状态迭代器, _下一个__ 执行一段代码直到下一个等待或返回。 _下一个__ 实现生成器代码的状态机版本,其中 yield 通过返回值来表示,并且 return 通过提高 停止迭代 .例如:

    class _MySleepIter:
        def __init__(self, n):
            self.n = n
            self.state = 0
    
        def __iter__(self):  # an iterator has to define __iter__
            return self
    
        def __next__(self):
            if self.state == 0:
                loop = asyncio.get_event_loop()
                self.future = loop.create_future()
                loop.call_later(self.n, self.future.set_result, None)
                self.state = 1
            if self.state == 1:
                if not self.future.done():
                    return next(iter(self.future))
                self.state = 2
            if self.state == 2:
                raise StopIteration
            raise AssertionError("invalid state")
    

    转换为C

    上面是相当多的类型,但是它可以工作,并且只使用可以用本机python/c函数定义的构造。

    实际上,将这两个类转换为C非常简单,但超出了这个答案的范围。