代码之家  ›  专栏  ›  技术社区  ›  bbartling

用Asyncio嵌入同步python代码

  •  0
  • bbartling  · 技术社区  · 4 年前

    下面的脚本是 open leader project 使用 iohttp asyncio

    新手问:有没有可能用异步代码嵌入同步代码?例如,这里的代码嵌入到 main.py 我知道下面是 同步。它的BACnet通信库 BAC0 .

    bacnet = BAC0.lite()
    
    
    async def collect_report_value():
        #read temp sensor from BACnet device
        T1 = bacnet.read('12345:2 analogInput 2 presentValue')
        return T1
    

    这个 主.py 脚本工程(如下所示)我可以 #read temp sensor from BACnet device 从异步函数 collect_report_value 我只是担心我可能会使整个过程同步。有人有什么建议吗?

    import asyncio
    from datetime import timedelta
    from openleadr import OpenADRClient, enable_default_logging
    import BAC0
    
    
    
    bacnet = BAC0.lite()
    
    
    async def collect_report_value():
        #read temp sensor from BACnet device
        T1 = bacnet.read('12345:2 analogInput 2 presentValue')
        return T1
    
    async def handle_event(event):
        # This callback receives an Event dict.
        # You should include code here that sends control signals to your resources.
        return 'optIn'
    
    # Create the client object
    client = OpenADRClient(ven_name='ven123',
                           vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
    
    # Add the report capability to the client
    client.add_report(callback=collect_report_value,
                      resource_id='device001',
                      measurement='temperature',
                      sampling_rate=timedelta(seconds=60))
    
    # Add event handling capability to the client
    client.add_handler('on_event', handle_event)
    
    # Run the client in the Python AsyncIO Event Loop
    loop = asyncio.get_event_loop()
    loop.create_task(client.run())
    loop.run_forever()
    

    编辑 最后的脚本,似乎工作

    import asyncio
    from datetime import timedelta
    from openleadr import OpenADRClient, enable_default_logging
    import BAC0
    import concurrent.futures
    
    enable_default_logging()
    
    
    #apprently runs on its own thread
    bacnet = BAC0.lite()
    
    
    def blocking_io():
        #read temp sensor from BACnet device
        try:
             sensor_value = bacnet.read('12345:2 analogInput 2 presentValue')
             return sensor_value
        except:
             return np.nan
    
    
    async def collect_report_value():
        # 1. Run in the default loop's executor:
        result = await loop.run_in_executor(None, blocking_io)
        return result
    
    
    
    async def handle_event(event):
        # This callback receives an Event dict.
        # You should include code here that sends control signals to your resources.
        return 'optIn'
    
    # Create the client object
    client = OpenADRClient(ven_name='ven123',
                           vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
    
    # Add the report capability to the client
    client.add_report(callback=collect_report_value,
                      resource_id='device001',
                      measurement='temperature',
                      sampling_rate=timedelta(seconds=60))
    
    # Add event handling capability to the client
    client.add_handler('on_event', handle_event)
    
    # Run the client in the Python AsyncIO Event Loop
    loop = asyncio.get_event_loop()
    loop.create_task(client.run())
    loop.run_forever()
    
    0 回复  |  直到 4 年前
        1
  •  0
  •   bbartling    4 年前

    这起作用了

    import asyncio
    from datetime import timedelta
    from openleadr import OpenADRClient, enable_default_logging
    import BAC0
    import concurrent.futures
    
    enable_default_logging()
    
    
    #apprently runs on its own thread
    bacnet = BAC0.lite()
    
    
    def blocking_io():
        #read temp sensor from BACnet device
        sensor_value = bacnet.read('12345:2 analogInput 2 presentValue')
        return sensor_value
    
    
    async def collect_report_value():
        # 1. Run in the default loop's executor:
        result = await loop.run_in_executor(None, blocking_io)
        return result
    
    
    
    async def handle_event(event):
        # This callback receives an Event dict.
        # You should include code here that sends control signals to your resources.
        return 'optIn'
    
    # Create the client object
    client = OpenADRClient(ven_name='ven123',
                           vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')