下面的脚本是
open leader project
使用
iohttp
和
asyncio
新手问:有没有可能用异步代码嵌入同步代码?例如,这里的代码嵌入到
main.py
我知道下面是
同步。它的BACnet通信库
BAC0
.
bacnet = BAC0.lite()
async def collect_report_value():
#read temp sensor from BACnet device
T1 = bacnet.read('12345:2 analogInput 2 presentValue')
return T1
这个
主.py
脚本工程(如下所示)我可以
#read temp sensor from BACnet device
从异步函数
collect_report_value
我只是担心我可能会使整个过程同步。有人有什么建议吗?
import asyncio
from datetime import timedelta
from openleadr import OpenADRClient, enable_default_logging
import BAC0
bacnet = BAC0.lite()
async def collect_report_value():
#read temp sensor from BACnet device
T1 = bacnet.read('12345:2 analogInput 2 presentValue')
return T1
async def handle_event(event):
# This callback receives an Event dict.
# You should include code here that sends control signals to your resources.
return 'optIn'
# Create the client object
client = OpenADRClient(ven_name='ven123',
vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
# Add the report capability to the client
client.add_report(callback=collect_report_value,
resource_id='device001',
measurement='temperature',
sampling_rate=timedelta(seconds=60))
# Add event handling capability to the client
client.add_handler('on_event', handle_event)
# Run the client in the Python AsyncIO Event Loop
loop = asyncio.get_event_loop()
loop.create_task(client.run())
loop.run_forever()
编辑
最后的脚本,似乎工作
import asyncio
from datetime import timedelta
from openleadr import OpenADRClient, enable_default_logging
import BAC0
import concurrent.futures
enable_default_logging()
#apprently runs on its own thread
bacnet = BAC0.lite()
def blocking_io():
#read temp sensor from BACnet device
try:
sensor_value = bacnet.read('12345:2 analogInput 2 presentValue')
return sensor_value
except:
return np.nan
async def collect_report_value():
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(None, blocking_io)
return result
async def handle_event(event):
# This callback receives an Event dict.
# You should include code here that sends control signals to your resources.
return 'optIn'
# Create the client object
client = OpenADRClient(ven_name='ven123',
vtn_url='http://localhost:8080/OpenADR2/Simple/2.0b')
# Add the report capability to the client
client.add_report(callback=collect_report_value,
resource_id='device001',
measurement='temperature',
sampling_rate=timedelta(seconds=60))
# Add event handling capability to the client
client.add_handler('on_event', handle_event)
# Run the client in the Python AsyncIO Event Loop
loop = asyncio.get_event_loop()
loop.create_task(client.run())
loop.run_forever()