Many plugins that communicate with cloud services or hardware devices receive data asynchronously. Before asyncio, that meant either polling the device constantly, or using a background thread to maintain a websocket or similar communication channel. In general, doing the same thing with asyncio is actually easier, except for the effort of integrating it into the synchronous plugin architecture. Here's how I'm doing it (this is from the new Harmony Hub plugin).
First, the asyncio event loop needs to be set up in it's own Thread. Initially I was doing this using
runConcurrentThread() but that didn't work out. The problem was that thread doesn't get started until after the plugin devices are started, and if you need asyncio to start up the devices, well, you see the problem. So:
- Code: Select all
def __init__(self, pluginId, pluginDisplayName, pluginVersion, pluginPrefs):
indigo.PluginBase.__init__(self, pluginId, pluginDisplayName, pluginVersion, pluginPrefs)
###
self._event_loop = None
self._async_thread = None
def startup(self):
self._event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._event_loop)
self._async_thread = threading.Thread(target=self._run_async_thread)
self._async_thread.start()
def _run_async_thread(self):
self.logger.debug("_run_async_thread starting")
self._event_loop.create_task(self._async_start())
self._event_loop.run_until_complete(self._async_stop())
self._event_loop.close()
async def _async_start(self):
self.logger.debug("_async_start")
# add things you need to do at the start of the plugin here
async def _async_stop(self):
while True:
await asyncio.sleep(1.0)
if self.stopThread:
break
At this point, there's a running asyncio event loop just waiting for things to do. Add code to
_async_start() as needed. You see that the event loop will continue to run until
_async_stop() exits, which is done when
self.stopThread is set. Cleanup code can be added before the break. Just make sure it'll never block.
Additional tasks can be added to the event loop as needed, for example:
- Code: Select all
def deviceStartComm(self, device):
if device.deviceTypeId == "harmonyHub":
self._event_loop.create_task(self._async_start_device(device))
async def _async_start_device(self, device):
client = HarmonyAPI(ip_address=device.address, protocol=self.protocol)
try:
connected = await client.connect()
except ConnectionRefusedError as e:
self.logger.debug(f"{device.name}: connect exception: {e}.")
return
As written, this does NOT do any cleanup on existing asyncio Tasks. I do some of that other places, like:
- Code: Select all
def deviceStopComm(self, device):
if device.deviceTypeId == "harmonyHub":
self._event_loop.create_task(self._async_stop_device(device.address))
async def _async_stop_device(self, ip_address):
hub_client = self._async_running_clients[ip_address]
try:
await asyncio.wait_for(hub_client.close(), timeout=5)
except aioharmony.exceptions.TimeOut:
self.logger.debug(f"{hub_client.name} HUB: Timeout trying to close connection.")