from __future__ import annotations import asyncio import concurrent.futures import contextlib from collections.abc import Awaitable, Generator from typing import TypeVar T = TypeVar('T') @contextlib.contextmanager def loop_in_thread() -> Generator[asyncio.AbstractEventLoop, None, None]: loop_fut: concurrent.futures.Future[asyncio.AbstractEventLoop] = ( concurrent.futures.Future() ) stop_event = asyncio.Event() async def main() -> None: loop_fut.set_result(asyncio.get_running_loop()) await stop_event.wait() with concurrent.futures.ThreadPoolExecutor(max_workers = 1) as tpe: complete_fut = tpe.submit(asyncio.run, main()) for fut in concurrent.futures.as_completed((loop_fut, complete_fut)): if fut is loop_fut: loop = loop_fut.result() try: yield loop finally: loop.call_soon_threadsafe(stop_event.set) else: fut.result() class AsyncRunner: def __init__(self) -> None: self._cm = loop_in_thread() self._loop = self._cm.__enter__() def call(self, awaitable: Awaitable[T], timeout: float | None = None) -> T: fut = asyncio.run_coroutine_threadsafe(awaitable, self._loop) # type: ignore return fut.result(timeout) def close(self) -> None: self._cm.__exit__(None, None, None) def __enter__(self) -> AsyncRunner: return self def __exit__(self, exc_type, exc, tb) -> None: self.close()