From ef40e068f3cb52ffee6798f4f436d11705a03563 Mon Sep 17 00:00:00 2001 From: Shlomo Zalman Rabinowitz <106286969+SZRabinowitz@users.noreply.github.com> Date: Wed, 22 Jan 2025 23:13:03 -0500 Subject: [PATCH] Adjustments for pr #313 - Creates executor to run async slots - All async slots run in dedicated thread --- TermTk/TTkCore/signal.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/TermTk/TTkCore/signal.py b/TermTk/TTkCore/signal.py index dacfaa66..3f540cb1 100644 --- a/TermTk/TTkCore/signal.py +++ b/TermTk/TTkCore/signal.py @@ -59,6 +59,7 @@ __all__ = ['pyTTkSlot', 'pyTTkSignal'] # from typing import TypeVar, TypeVarTuple, Generic, List from inspect import getfullargspec, iscoroutinefunction +from concurrent.futures import ThreadPoolExecutor from types import LambdaType from threading import Lock import asyncio @@ -75,7 +76,7 @@ def pyTTkSlot(*args): class pyTTkSignal(): _asyncio_event_loop = asyncio.new_event_loop() _signals = [] - __slots__ = ('_types', '_connected_slots', '_connected_async_slots', '_mutex') + __slots__ = ('_types', '_connected_slots', '_connected_async_slots', '_mutex', '_async_executor') def __init__(self, *args, **kwargs) -> None: # ref: http://pyqt.sourceforge.net/Docs/PyQt5/signals_slots.html#PyQt5.QtCore.pyqtSignal @@ -91,6 +92,7 @@ class pyTTkSignal(): self._connected_slots = {} self._connected_async_slots = {} self._mutex = Lock() + self._async_executor = None pyTTkSignal._signals.append(self) def connect(self, slot): @@ -137,6 +139,14 @@ class pyTTkSignal(): if slot in self._connected_slots: del self._connected_slots[slot] + def async_runner(self, coros): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(asyncio.gather(*coros)) + loop.close() + + + def emit(self, *args, **kwargs) -> None: if not self._mutex.acquire(False): return if len(args) != len(self._types): @@ -145,12 +155,10 @@ class pyTTkSignal(): for slot,sl in self._connected_slots.copy().items(): slot(*args[sl], **kwargs) if self._connected_async_slots: - asyncio.set_event_loop(_loop:=pyTTkSignal._asyncio_event_loop) - # asyncio.set_event_loop(_loop:=asyncio.new_event_loop()) - for slot,sl in self._connected_async_slots.copy().items(): - asyncio.run_coroutine_threadsafe(slot(*args[sl], **kwargs), _loop) - # should I call the future results? - + if self._async_executor is None: + self._async_executor = ThreadPoolExecutor(max_workers=1) # async dont need so many workers + coros = [slot(*args[sl], **kwargs) for slot,sl in self._connected_async_slots.copy().items()] + self._async_executor.submit(self.async_runner, coros) self._mutex.release() def clear(self):