hat.aio

Async utility functions

 1"""Async utility functions"""
 2
 3from hat.aio.executor import (Executor,
 4                              create_executor)
 5from hat.aio.group import (Resource,
 6                           Group)
 7from hat.aio.misc import (first,
 8                          uncancellable,
 9                          AsyncCallable,
10                          call,
11                          call_on_cancel,
12                          call_on_done,
13                          init_asyncio,
14                          run_asyncio)
15from hat.aio.queue import (QueueClosedError,
16                           QueueEmptyError,
17                           QueueFullError,
18                           Queue)
19from hat.aio.wait import (CancelledWithResultError,
20                          wait_for)
21
22
23__all__ = ['Executor',
24           'create_executor',
25           'Resource',
26           'Group',
27           'first',
28           'uncancellable',
29           'AsyncCallable',
30           'call',
31           'call_on_cancel',
32           'call_on_done',
33           'init_asyncio',
34           'run_asyncio',
35           'QueueClosedError',
36           'QueueEmptyError',
37           'QueueFullError',
38           'Queue',
39           'CancelledWithResultError',
40           'wait_for']
class Executor(hat.aio.Resource):
11class Executor(Resource):
12    """Executor wrapping `asyncio.loop.run_in_executor`.
13
14    Wrapped executor is created from `executor_cls` with provided `args`.
15
16    If `wait_futures` is ``True``, executor will be closed once all running
17    tasks finishes.
18
19    `log_exceptions` is delegated to `async_group`.
20
21    Args:
22        args: executor args
23        executor_cls: executor class
24        log_exceptions: log exceptions
25
26    Example::
27
28        executor1 = Executor()
29        executor2 = Executor()
30        tid1 = await executor1.spawn(threading.get_ident)
31        tid2 = await executor2.spawn(threading.get_ident)
32        assert tid1 != tid2
33
34    """
35
36    def __init__(self,
37                 *args,
38                 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor,  # NOQA
39                 log_exceptions: bool = True,
40                 wait_futures: bool = True):
41        self._wait_futures = wait_futures
42        self._executor = executor_cls(*args)
43        self._loop = asyncio.get_running_loop()
44        self._async_group = Group(log_exceptions)
45
46        self.async_group.spawn(call_on_cancel, self._executor.shutdown, False)
47
48    @property
49    def async_group(self):
50        """Async group"""
51        return self._async_group
52
53    def spawn(self,
54              fn: Callable,
55              *args, **kwargs
56              ) -> asyncio.Task:
57        """Spawn new task"""
58        return self.async_group.spawn(self._spawn, fn, args, kwargs)
59
60    async def _spawn(self, fn, args, kwargs):
61        func = functools.partial(fn, *args, **kwargs)
62        coro = self._loop.run_in_executor(self._executor, func)
63
64        if self._wait_futures:
65            coro = uncancellable(coro)
66
67        return await coro

Executor wrapping asyncio.loop.run_in_executor.

Wrapped executor is created from executor_cls with provided args.

If wait_futures is True, executor will be closed once all running tasks finishes.

log_exceptions is delegated to async_group.

Arguments:
  • args: executor args
  • executor_cls: executor class
  • log_exceptions: log exceptions

Example::

executor1 = Executor()
executor2 = Executor()
tid1 = await executor1.spawn(threading.get_ident)
tid2 = await executor2.spawn(threading.get_ident)
assert tid1 != tid2
Executor( *args, executor_cls: type[concurrent.futures._base.Executor] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>, log_exceptions: bool = True, wait_futures: bool = True)
36    def __init__(self,
37                 *args,
38                 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor,  # NOQA
39                 log_exceptions: bool = True,
40                 wait_futures: bool = True):
41        self._wait_futures = wait_futures
42        self._executor = executor_cls(*args)
43        self._loop = asyncio.get_running_loop()
44        self._async_group = Group(log_exceptions)
45
46        self.async_group.spawn(call_on_cancel, self._executor.shutdown, False)
async_group
48    @property
49    def async_group(self):
50        """Async group"""
51        return self._async_group

Async group

def spawn(self, fn: Callable, *args, **kwargs) -> _asyncio.Task:
53    def spawn(self,
54              fn: Callable,
55              *args, **kwargs
56              ) -> asyncio.Task:
57        """Spawn new task"""
58        return self.async_group.spawn(self._spawn, fn, args, kwargs)

Spawn new task

def create_executor( *args, executor_cls: type[concurrent.futures._base.Executor] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>, loop: asyncio.events.AbstractEventLoop | None = None) -> Callable[..., Awaitable]:
 70def create_executor(*args,
 71                    executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor,  # NOQA
 72                    loop: asyncio.AbstractEventLoop | None = None
 73                    ) -> Callable[..., Awaitable]:
 74    """Create `asyncio.loop.run_in_executor` wrapper.
 75
 76    Wrapped executor is created from `executor_cls` with provided `args`.
 77
 78    This function returns coroutine that takes a function and its arguments,
 79    executes the function in executor and returns the result.
 80
 81    Args:
 82        args: executor args
 83        executor_cls: executor class
 84        loop: asyncio loop
 85
 86    Returns:
 87        executor coroutine
 88
 89    Example::
 90
 91        executor1 = create_executor()
 92        executor2 = create_executor()
 93        tid1 = await executor1(threading.get_ident)
 94        tid2 = await executor2(threading.get_ident)
 95        assert tid1 != tid2
 96
 97    """
 98    executor = executor_cls(*args)
 99
100    async def executor_wrapper(func, /, *args, **kwargs):
101        _loop = loop or asyncio.get_running_loop()
102        fn = functools.partial(func, *args, **kwargs)
103        return await _loop.run_in_executor(executor, fn)
104
105    return executor_wrapper

Create asyncio.loop.run_in_executor wrapper.

Wrapped executor is created from executor_cls with provided args.

This function returns coroutine that takes a function and its arguments, executes the function in executor and returns the result.

Arguments:
  • args: executor args
  • executor_cls: executor class
  • loop: asyncio loop
Returns:

executor coroutine

Example::

executor1 = create_executor()
executor2 = create_executor()
tid1 = await executor1(threading.get_ident)
tid2 = await executor2(threading.get_ident)
assert tid1 != tid2
class Resource(abc.ABC):
13class Resource(abc.ABC):
14    """Resource with lifetime control based on `Group`."""
15
16    async def __aenter__(self):
17        return self
18
19    async def __aexit__(self, *args):
20        await self.async_close()
21
22    @property
23    @abc.abstractmethod
24    def async_group(self) -> 'Group':
25        """Group controlling resource's lifetime."""
26
27    @property
28    def is_open(self) -> bool:
29        """``True`` if not closing or closed, ``False`` otherwise."""
30        return self.async_group.is_open
31
32    @property
33    def is_closing(self) -> bool:
34        """Is resource closing or closed."""
35        return self.async_group.is_closing
36
37    @property
38    def is_closed(self) -> bool:
39        """Is resource closed."""
40        return self.async_group.is_closed
41
42    async def wait_closing(self):
43        """Wait until closing is ``True``."""
44        await self.async_group.wait_closing()
45
46    async def wait_closed(self):
47        """Wait until closed is ``True``."""
48        await self.async_group.wait_closed()
49
50    def close(self):
51        """Close resource."""
52        self.async_group.close()
53
54    async def async_close(self):
55        """Close resource and wait until closed is ``True``."""
56        await self.async_group.async_close()

Resource with lifetime control based on Group.

async_group: Group
22    @property
23    @abc.abstractmethod
24    def async_group(self) -> 'Group':
25        """Group controlling resource's lifetime."""

Group controlling resource's lifetime.

is_open: bool
27    @property
28    def is_open(self) -> bool:
29        """``True`` if not closing or closed, ``False`` otherwise."""
30        return self.async_group.is_open

True if not closing or closed, False otherwise.

is_closing: bool
32    @property
33    def is_closing(self) -> bool:
34        """Is resource closing or closed."""
35        return self.async_group.is_closing

Is resource closing or closed.

is_closed: bool
37    @property
38    def is_closed(self) -> bool:
39        """Is resource closed."""
40        return self.async_group.is_closed

Is resource closed.

async def wait_closing(self):
42    async def wait_closing(self):
43        """Wait until closing is ``True``."""
44        await self.async_group.wait_closing()

Wait until closing is True.

async def wait_closed(self):
46    async def wait_closed(self):
47        """Wait until closed is ``True``."""
48        await self.async_group.wait_closed()

Wait until closed is True.

def close(self):
50    def close(self):
51        """Close resource."""
52        self.async_group.close()

Close resource.

async def async_close(self):
54    async def async_close(self):
55        """Close resource and wait until closed is ``True``."""
56        await self.async_group.async_close()

Close resource and wait until closed is True.

class Group(hat.aio.Resource):
 63class Group(Resource):
 64    """Group of asyncio Tasks.
 65
 66    Group enables creation and management of related asyncio Tasks. The
 67    Group ensures uninterrupted execution of Tasks and Task completion upon
 68    Group closing.
 69
 70    Group can contain subgroups, which are independent Groups managed by the
 71    parent Group.
 72
 73    If a Task raises exception, other Tasks continue to execute.
 74
 75    If `log_exceptions` is ``True``, exceptions raised by spawned tasks are
 76    logged with level ERROR.
 77
 78    """
 79
 80    def __init__(self,
 81                 log_exceptions: bool = True,
 82                 *,
 83                 loop: asyncio.AbstractEventLoop | None = None):
 84        self._log_exceptions = log_exceptions
 85        self._loop = loop or asyncio.get_running_loop()
 86        self._closing = self._loop.create_future()
 87        self._closed = self._loop.create_future()
 88        self._tasks = set()
 89        self._parent = None
 90        self._children = set()
 91
 92    @property
 93    def async_group(self):
 94        """Async group"""
 95        return self
 96
 97    @property
 98    def is_open(self) -> bool:
 99        """``True`` if group is not closing or closed, ``False`` otherwise."""
100        return not self._closing.done()
101
102    @property
103    def is_closing(self) -> bool:
104        """Is group closing or closed."""
105        return self._closing.done()
106
107    @property
108    def is_closed(self) -> bool:
109        """Is group closed."""
110        return self._closed.done()
111
112    async def wait_closing(self):
113        """Wait until closing is ``True``."""
114        await asyncio.shield(self._closing)
115
116    async def wait_closed(self):
117        """Wait until closed is ``True``."""
118        await asyncio.shield(self._closed)
119
120    def create_subgroup(self,
121                        log_exceptions: bool | None = None
122                        ) -> 'Group':
123        """Create new Group as a child of this Group. Return the new Group.
124
125        When a parent Group gets closed, all of its children are closed.
126        Closing of a subgroup has no effect on the parent Group.
127
128        If `log_exceptions` is ``None``, subgroup inherits `log_exceptions`
129        from its parent.
130
131        """
132        if self._closing.done():
133            raise GroupClosedError("can't create subgroup of closed group")
134
135        child = Group(
136            log_exceptions=(self._log_exceptions if log_exceptions is None
137                            else log_exceptions),
138            loop=self._loop)
139        child._parent = self
140        self._children.add(child)
141        return child
142
143    def wrap(self,
144             obj: Awaitable
145             ) -> asyncio.Task:
146        """Wrap the awaitable object into a Task and schedule its execution.
147        Return the Task object.
148
149        Resulting task is shielded and can be canceled only with
150        `Group.close`.
151
152        """
153        if self._closing.done():
154            raise GroupClosedError("can't wrap object in closed group")
155
156        if asyncio.iscoroutine(obj):
157            task = self._loop.create_task(obj)
158
159        else:
160            task = asyncio.ensure_future(obj, loop=self._loop)
161
162        self._tasks.add(task)
163        task.add_done_callback(self._on_task_done)
164
165        return asyncio.shield(task)
166
167    def spawn(self,
168              fn: Callable[..., Awaitable],
169              *args, **kwargs
170              ) -> asyncio.Task:
171        """Wrap the result of a `fn` into a Task and schedule its execution.
172        Return the Task object.
173
174        Function `fn` is called with provided `args` and `kwargs`.
175        Resulting Task is shielded and can be canceled only with
176        `Group.close`.
177
178        """
179        if self._closing.done():
180            raise GroupClosedError("can't spawn task in closed group")
181
182        future = fn(*args, **kwargs)
183        return self.wrap(future)
184
185    def close(self):
186        """Schedule Group closing.
187
188        Closing Future is set immediately. All subgroups are closed, and all
189        running tasks are canceled. Once closing of all subgroups
190        and execution of all tasks is completed, closed Future is set.
191
192        """
193        if self._closing.done():
194            return
195
196        self._closing.set_result(True)
197
198        for child in list(self._children):
199            child.close()
200
201        for task in self._tasks:
202            self._loop.call_soon(task.cancel)
203
204        futures = [*self._tasks,
205                   *(child._closed for child in self._children)]
206        if futures:
207            waiting_task = self._loop.create_task(asyncio.wait(futures))
208            waiting_task.add_done_callback(lambda _: self._on_closed())
209
210        else:
211            self._on_closed()
212
213    async def async_close(self):
214        """Close Group and wait until closed is ``True``."""
215        self.close()
216        await self.wait_closed()
217
218    async def __aenter__(self):
219        return self
220
221    async def __aexit__(self, *args):
222        await self.async_close()
223
224    def _on_closed(self):
225        if self._parent is not None:
226            self._parent._children.remove(self)
227            self._parent = None
228
229        self._closed.set_result(True)
230
231    def _on_task_done(self, task):
232        self._tasks.remove(task)
233
234        if task.cancelled():
235            return
236
237        e = task.exception()
238        if e and self._log_exceptions:
239            mlog.error('unhandled exception in async group: %s', e, exc_info=e)
240            warnings.warn('unhandled exception in async group')

Group of asyncio Tasks.

Group enables creation and management of related asyncio Tasks. The Group ensures uninterrupted execution of Tasks and Task completion upon Group closing.

Group can contain subgroups, which are independent Groups managed by the parent Group.

If a Task raises exception, other Tasks continue to execute.

If log_exceptions is True, exceptions raised by spawned tasks are logged with level ERROR.

Group( log_exceptions: bool = True, *, loop: asyncio.events.AbstractEventLoop | None = None)
80    def __init__(self,
81                 log_exceptions: bool = True,
82                 *,
83                 loop: asyncio.AbstractEventLoop | None = None):
84        self._log_exceptions = log_exceptions
85        self._loop = loop or asyncio.get_running_loop()
86        self._closing = self._loop.create_future()
87        self._closed = self._loop.create_future()
88        self._tasks = set()
89        self._parent = None
90        self._children = set()
async_group
92    @property
93    def async_group(self):
94        """Async group"""
95        return self

Async group

is_open: bool
 97    @property
 98    def is_open(self) -> bool:
 99        """``True`` if group is not closing or closed, ``False`` otherwise."""
100        return not self._closing.done()

True if group is not closing or closed, False otherwise.

is_closing: bool
102    @property
103    def is_closing(self) -> bool:
104        """Is group closing or closed."""
105        return self._closing.done()

Is group closing or closed.

is_closed: bool
107    @property
108    def is_closed(self) -> bool:
109        """Is group closed."""
110        return self._closed.done()

Is group closed.

async def wait_closing(self):
112    async def wait_closing(self):
113        """Wait until closing is ``True``."""
114        await asyncio.shield(self._closing)

Wait until closing is True.

async def wait_closed(self):
116    async def wait_closed(self):
117        """Wait until closed is ``True``."""
118        await asyncio.shield(self._closed)

Wait until closed is True.

def create_subgroup(self, log_exceptions: bool | None = None) -> Group:
120    def create_subgroup(self,
121                        log_exceptions: bool | None = None
122                        ) -> 'Group':
123        """Create new Group as a child of this Group. Return the new Group.
124
125        When a parent Group gets closed, all of its children are closed.
126        Closing of a subgroup has no effect on the parent Group.
127
128        If `log_exceptions` is ``None``, subgroup inherits `log_exceptions`
129        from its parent.
130
131        """
132        if self._closing.done():
133            raise GroupClosedError("can't create subgroup of closed group")
134
135        child = Group(
136            log_exceptions=(self._log_exceptions if log_exceptions is None
137                            else log_exceptions),
138            loop=self._loop)
139        child._parent = self
140        self._children.add(child)
141        return child

Create new Group as a child of this Group. Return the new Group.

When a parent Group gets closed, all of its children are closed. Closing of a subgroup has no effect on the parent Group.

If log_exceptions is None, subgroup inherits log_exceptions from its parent.

def wrap(self, obj: Awaitable) -> _asyncio.Task:
143    def wrap(self,
144             obj: Awaitable
145             ) -> asyncio.Task:
146        """Wrap the awaitable object into a Task and schedule its execution.
147        Return the Task object.
148
149        Resulting task is shielded and can be canceled only with
150        `Group.close`.
151
152        """
153        if self._closing.done():
154            raise GroupClosedError("can't wrap object in closed group")
155
156        if asyncio.iscoroutine(obj):
157            task = self._loop.create_task(obj)
158
159        else:
160            task = asyncio.ensure_future(obj, loop=self._loop)
161
162        self._tasks.add(task)
163        task.add_done_callback(self._on_task_done)
164
165        return asyncio.shield(task)

Wrap the awaitable object into a Task and schedule its execution. Return the Task object.

Resulting task is shielded and can be canceled only with Group.close.

def spawn(self, fn: Callable[..., Awaitable], *args, **kwargs) -> _asyncio.Task:
167    def spawn(self,
168              fn: Callable[..., Awaitable],
169              *args, **kwargs
170              ) -> asyncio.Task:
171        """Wrap the result of a `fn` into a Task and schedule its execution.
172        Return the Task object.
173
174        Function `fn` is called with provided `args` and `kwargs`.
175        Resulting Task is shielded and can be canceled only with
176        `Group.close`.
177
178        """
179        if self._closing.done():
180            raise GroupClosedError("can't spawn task in closed group")
181
182        future = fn(*args, **kwargs)
183        return self.wrap(future)

Wrap the result of a fn into a Task and schedule its execution. Return the Task object.

Function fn is called with provided args and kwargs. Resulting Task is shielded and can be canceled only with Group.close.

def close(self):
185    def close(self):
186        """Schedule Group closing.
187
188        Closing Future is set immediately. All subgroups are closed, and all
189        running tasks are canceled. Once closing of all subgroups
190        and execution of all tasks is completed, closed Future is set.
191
192        """
193        if self._closing.done():
194            return
195
196        self._closing.set_result(True)
197
198        for child in list(self._children):
199            child.close()
200
201        for task in self._tasks:
202            self._loop.call_soon(task.cancel)
203
204        futures = [*self._tasks,
205                   *(child._closed for child in self._children)]
206        if futures:
207            waiting_task = self._loop.create_task(asyncio.wait(futures))
208            waiting_task.add_done_callback(lambda _: self._on_closed())
209
210        else:
211            self._on_closed()

Schedule Group closing.

Closing Future is set immediately. All subgroups are closed, and all running tasks are canceled. Once closing of all subgroups and execution of all tasks is completed, closed Future is set.

async def async_close(self):
213    async def async_close(self):
214        """Close Group and wait until closed is ``True``."""
215        self.close()
216        await self.wait_closed()

Close Group and wait until closed is True.

async def first( xs: AsyncIterable[~T], fn: Callable[[~T], typing.Any] = <function <lambda>>, default: Optional[~T] = None) -> Optional[~T]:
14async def first(xs: AsyncIterable[T],
15                fn: Callable[[T], typing.Any] = lambda _: True,
16                default: T | None = None
17                ) -> T | None:
18    """Return the first element from async iterable that satisfies
19    predicate `fn`, or `default` if no such element exists.
20
21    Result of predicate `fn` can be of any type. Predicate is satisfied if it's
22    return value is truthy.
23
24    Args:
25        xs: async collection
26        fn: predicate
27        default: default value
28
29    Example::
30
31        async def async_range(x):
32            for i in range(x):
33                await asyncio.sleep(0)
34                yield i
35
36        assert await first(async_range(3)) == 0
37        assert await first(async_range(3), lambda x: x > 1) == 2
38        assert await first(async_range(3), lambda x: x > 2) is None
39        assert await first(async_range(3), lambda x: x > 2, 123) == 123
40
41    """
42    async for i in xs:
43        if fn(i):
44            return i
45
46    return default

Return the first element from async iterable that satisfies predicate fn, or default if no such element exists.

Result of predicate fn can be of any type. Predicate is satisfied if it's return value is truthy.

Arguments:
  • xs: async collection
  • fn: predicate
  • default: default value

Example::

async def async_range(x):
    for i in range(x):
        await asyncio.sleep(0)
        yield i

assert await first(async_range(3)) == 0
assert await first(async_range(3), lambda x: x > 1) == 2
assert await first(async_range(3), lambda x: x > 2) is None
assert await first(async_range(3), lambda x: x > 2, 123) == 123
async def uncancellable(obj: Awaitable[~T], raise_cancel: bool = True) -> ~T:
49async def uncancellable(obj: Awaitable[T],
50                        raise_cancel: bool = True
51                        ) -> T:
52    """Uncancellable execution of a awaitable object.
53
54    Object is scheduled as task, shielded and its execution cannot be
55    interrupted.
56
57    If `raise_cancel` is `True` and the object gets canceled,
58    `asyncio.CancelledError` is reraised after the Future finishes.
59
60    Warning:
61        If `raise_cancel` is `False`, this method suppresses
62        `asyncio.CancelledError` and stops its propagation. Use with
63        caution.
64
65    Args:
66        obj: awaitable object
67        raise_cancel: raise CancelledError flag
68
69    Returns:
70        object execution result
71
72    """
73    exception = None
74    loop = asyncio.get_running_loop()
75
76    if asyncio.iscoroutine(obj):
77        task = loop.create_task(obj)
78
79    else:
80        task = asyncio.ensure_future(obj, loop=loop)
81
82    while not task.done():
83        try:
84            await asyncio.shield(task)
85
86        except asyncio.CancelledError as e:
87            if raise_cancel:
88                exception = e
89
90        except Exception:
91            pass
92
93    if exception:
94        raise exception
95
96    return task.result()

Uncancellable execution of a awaitable object.

Object is scheduled as task, shielded and its execution cannot be interrupted.

If raise_cancel is True and the object gets canceled, asyncio.CancelledError is reraised after the Future finishes.

Warning:

If raise_cancel is False, this method suppresses asyncio.CancelledError and stops its propagation. Use with caution.

Arguments:
  • obj: awaitable object
  • raise_cancel: raise CancelledError flag
Returns:

object execution result

AsyncCallable = typing.Callable
async def call(fn: Callable[..., Union[~T, Awaitable[~T]]], *args, **kwargs) -> ~T:
117async def call(fn: AsyncCallable[..., T], *args, **kwargs) -> T:
118    """Call a function or a coroutine (or other callable object).
119
120    Call the `fn` with `args` and `kwargs`. If result of this call is
121    awaitable, it is awaited and returned. Otherwise, result is immediately
122    returned.
123
124    Args:
125        fn: callable object
126        args: additional positional arguments
127        kwargs: additional keyword arguments
128
129    Returns:
130        awaited result or result
131
132    Example:
133
134        def f1(x):
135            return x
136
137        def f2(x):
138            f = asyncio.Future()
139            f.set_result(x)
140            return f
141
142        async def f3(x):
143            return x
144
145        assert 'f1' == await hat.aio.call(f1, 'f1')
146        assert 'f2' == await hat.aio.call(f2, 'f2')
147        assert 'f3' == await hat.aio.call(f3, 'f3')
148
149    """
150    result = fn(*args, **kwargs)
151
152    if inspect.isawaitable(result):
153        result = await result
154
155    return result

Call a function or a coroutine (or other callable object).

Call the fn with args and kwargs. If result of this call is awaitable, it is awaited and returned. Otherwise, result is immediately returned.

Arguments:
  • fn: callable object
  • args: additional positional arguments
  • kwargs: additional keyword arguments
Returns:

awaited result or result

Example:

def f1(x): return x

def f2(x): f = asyncio.Future() f.set_result(x) return f

async def f3(x): return x

assert 'f1' == await hat.aio.call(f1, 'f1') assert 'f2' == await hat.aio.call(f2, 'f2') assert 'f3' == await hat.aio.call(f3, 'f3')

async def call_on_cancel(fn: Callable[..., Union[~T, Awaitable[~T]]], *args, **kwargs) -> ~T:
158async def call_on_cancel(fn: AsyncCallable[..., T], *args, **kwargs) -> T:
159    """Call a function or a coroutine when canceled.
160
161    When canceled, `fn` is called with `args` and `kwargs` by using
162    `call` coroutine.
163
164    Args:
165        fn: function or coroutine
166        args: additional function arguments
167        kwargs: additional function keyword arguments
168
169    Returns:
170        function result
171
172    Example::
173
174        f = asyncio.Future()
175        group = Group()
176        group.spawn(call_on_cancel, f.set_result, 123)
177        assert not f.done()
178        await group.async_close()
179        assert f.result() == 123
180
181    """
182    with contextlib.suppress(asyncio.CancelledError):
183        await asyncio.get_running_loop().create_future()
184
185    return await call(fn, *args, *kwargs)

Call a function or a coroutine when canceled.

When canceled, fn is called with args and kwargs by using call coroutine.

Arguments:
  • fn: function or coroutine
  • args: additional function arguments
  • kwargs: additional function keyword arguments
Returns:

function result

Example::

f = asyncio.Future()
group = Group()
group.spawn(call_on_cancel, f.set_result, 123)
assert not f.done()
await group.async_close()
assert f.result() == 123
async def call_on_done( f: Awaitable, fn: Callable[..., Union[~T, Awaitable[~T]]], *args, **kwargs) -> ~T:
188async def call_on_done(f: Awaitable,
189                       fn: AsyncCallable[..., T],
190                       *args, **kwargs
191                       ) -> T:
192    """Call a function or a coroutine when awaitable is done.
193
194    When `f` is done, `fn` is called with `args` and `kwargs` by using
195    `call` coroutine.
196
197    If this coroutine is canceled before `f` is done, `f` is canceled and `fn`
198    is not called.
199
200    If this coroutine is canceled after `f` is done, `fn` call is canceled.
201
202    Args:
203        f: awaitable future
204        fn: function or coroutine
205        args: additional function arguments
206        kwargs: additional function keyword arguments
207
208    Returns:
209        function result
210
211    Example::
212
213        f = asyncio.Future()
214        group = Group()
215        group.spawn(call_on_done, f, group.close)
216        assert group.is_open
217        f.set_result(None)
218        await group.wait_closed()
219        assert group.is_closed
220
221    """
222    with contextlib.suppress(Exception):
223        await f
224
225    return await call(fn, *args, *kwargs)

Call a function or a coroutine when awaitable is done.

When f is done, fn is called with args and kwargs by using call coroutine.

If this coroutine is canceled before f is done, f is canceled and fn is not called.

If this coroutine is canceled after f is done, fn call is canceled.

Arguments:
  • f: awaitable future
  • fn: function or coroutine
  • args: additional function arguments
  • kwargs: additional function keyword arguments
Returns:

function result

Example::

f = asyncio.Future()
group = Group()
group.spawn(call_on_done, f, group.close)
assert group.is_open
f.set_result(None)
await group.wait_closed()
assert group.is_closed
def init_asyncio(policy: asyncio.events.AbstractEventLoopPolicy | None = None):
228def init_asyncio(policy: asyncio.AbstractEventLoopPolicy | None = None):
229    """Initialize asyncio.
230
231    Sets event loop policy (if ``None``, instance of
232    `asyncio.DefaultEventLoopPolicy` is used).
233
234    On Windows, `asyncio.WindowsProactorEventLoopPolicy` is used as default
235    policy.
236
237    """
238
239    def get_default_policy():
240        if sys.platform == 'win32':
241            return asyncio.WindowsProactorEventLoopPolicy()
242
243        # TODO: evaluate usage of uvloop
244        # with contextlib.suppress(ModuleNotFoundError):
245        #     import uvloop
246        #     return uvloop.EventLoopPolicy()
247
248        return asyncio.DefaultEventLoopPolicy()
249
250    asyncio.set_event_loop_policy(policy or get_default_policy())

Initialize asyncio.

Sets event loop policy (if None, instance of asyncio.DefaultEventLoopPolicy is used).

On Windows, asyncio.WindowsProactorEventLoopPolicy is used as default policy.

def run_asyncio( future: Awaitable[~T], *, handle_signals: bool = True, loop: asyncio.events.AbstractEventLoop | None = None) -> ~T:
253def run_asyncio(future: Awaitable[T], *,
254                handle_signals: bool = True,
255                loop: asyncio.AbstractEventLoop | None = None
256                ) -> T:
257    """Run asyncio loop until the `future` is completed and return the result.
258
259    If `handle_signals` is ``True``, SIGINT and SIGTERM handlers are
260    temporarily overridden. Instead of raising ``KeyboardInterrupt`` on every
261    signal reception, Future is canceled only once. Additional signals are
262    ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also
263    overridden.
264
265    If `loop` is set to ``None``, new event loop is created and set
266    as thread's default event loop. Newly created loop is closed when this
267    coroutine returns. Running tasks or async generators, other than provided
268    `future`, are not canceled prior to loop closing.
269
270    On Windows, asyncio loop gets periodically woken up (every 0.5 seconds).
271
272    Args:
273        future: future or coroutine
274        handle_signals: handle signals flag
275        loop: event loop
276
277    Returns:
278        future's result
279
280    Example::
281
282        async def run():
283            await asyncio.sleep(0)
284            return 123
285
286        result = run_asyncio(run())
287        assert result == 123
288
289    """
290    close_loop = loop is None
291    if loop is None:
292        loop = asyncio.new_event_loop()
293        asyncio.set_event_loop(loop)
294
295    task = asyncio.ensure_future(future, loop=loop)
296
297    if sys.platform == 'win32':
298
299        async def task_wrapper(task):
300            try:
301                while not task.done():
302                    await asyncio.wait([task], timeout=0.5)
303            except asyncio.CancelledError:
304                task.cancel()
305            return await task
306
307        task = asyncio.ensure_future(task_wrapper(task), loop=loop)
308
309    if not handle_signals:
310        return loop.run_until_complete(task)
311
312    canceled = False
313    signalnums = [signal.SIGINT, signal.SIGTERM]
314    if sys.platform == 'win32':
315        signalnums += [signal.SIGBREAK]
316
317    def signal_handler(*args):
318        nonlocal canceled
319        if canceled:
320            return
321        loop.call_soon_threadsafe(task.cancel)
322        canceled = True
323
324    handlers = {signalnum: signal.getsignal(signalnum) or signal.SIG_DFL
325                for signalnum in signalnums}
326    for signalnum in signalnums:
327        signal.signal(signalnum, signal_handler)
328
329    try:
330        return loop.run_until_complete(task)
331
332    finally:
333        for signalnum, handler in handlers.items():
334            signal.signal(signalnum, handler)
335        if close_loop:
336            loop.close()

Run asyncio loop until the future is completed and return the result.

If handle_signals is True, SIGINT and SIGTERM handlers are temporarily overridden. Instead of raising KeyboardInterrupt on every signal reception, Future is canceled only once. Additional signals are ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also overridden.

If loop is set to None, new event loop is created and set as thread's default event loop. Newly created loop is closed when this coroutine returns. Running tasks or async generators, other than provided future, are not canceled prior to loop closing.

On Windows, asyncio loop gets periodically woken up (every 0.5 seconds).

Arguments:
  • future: future or coroutine
  • handle_signals: handle signals flag
  • loop: event loop
Returns:

future's result

Example::

async def run():
    await asyncio.sleep(0)
    return 123

result = run_asyncio(run())
assert result == 123
class QueueClosedError(builtins.Exception):
11class QueueClosedError(Exception):
12    """Raised when trying to use a closed queue."""

Raised when trying to use a closed queue.

class QueueEmptyError(builtins.Exception):
15class QueueEmptyError(Exception):
16    """Raised if queue is empty."""

Raised if queue is empty.

class QueueFullError(builtins.Exception):
19class QueueFullError(Exception):
20    """Raised if queue is full."""

Raised if queue is full.

class Queue(typing.Generic[~T]):
 23class Queue(typing.Generic[T]):
 24    """Asyncio queue which implements AsyncIterable and can be closed.
 25
 26    Interface and implementation are based on `asyncio.Queue`.
 27
 28    If `maxsize` is less than or equal to zero, the queue size is infinite.
 29
 30    Args:
 31        maxsize: maximum number of items in the queue
 32
 33    Example::
 34
 35        queue = Queue(maxsize=1)
 36
 37        async def producer():
 38            for i in range(4):
 39                await queue.put(i)
 40            queue.close()
 41
 42        async def consumer():
 43            result = 0
 44            async for i in queue:
 45                result += i
 46            return result
 47
 48        asyncio.ensure_future(producer())
 49        result = await consumer()
 50        assert result == 6
 51
 52    """
 53
 54    def __init__(self,
 55                 maxsize: int = 0):
 56        self._maxsize = maxsize
 57        self._queue = collections.deque()
 58        self._getters = collections.deque()
 59        self._putters = collections.deque()
 60        self._closed = False
 61
 62    def __aiter__(self):
 63        return self
 64
 65    async def __anext__(self):
 66        try:
 67            return await self.get()
 68
 69        except QueueClosedError:
 70            raise StopAsyncIteration
 71
 72    def __str__(self):
 73        return (f'<{type(self).__name__}'
 74                f' _closed={self._closed} '
 75                f' _queue={list(self._queue)}>')
 76
 77    def __len__(self):
 78        return len(self._queue)
 79
 80    @property
 81    def maxsize(self) -> int:
 82        """Maximum number of items in the queue."""
 83        return self._maxsize
 84
 85    @property
 86    def is_closed(self) -> bool:
 87        """Is queue closed."""
 88        return self._closed
 89
 90    def empty(self) -> bool:
 91        """``True`` if queue is empty, ``False`` otherwise."""
 92        return not self._queue
 93
 94    def full(self) -> bool:
 95        """``True`` if queue is full, ``False`` otherwise."""
 96        if self._maxsize > 0:
 97            return len(self._queue) >= self._maxsize
 98
 99        return False
100
101    def qsize(self) -> int:
102        """Number of items currently in the queue."""
103        return len(self._queue)
104
105    def close(self):
106        """Close the queue."""
107        if self._closed:
108            return
109
110        self._closed = True
111        self._wakeup_all(self._putters)
112        self._wakeup_next(self._getters)
113
114    def get_nowait(self) -> T:
115        """Return an item if one is immediately available, else raise
116        `QueueEmptyError`.
117
118        Raises:
119            QueueEmptyError
120
121        """
122        if self.empty():
123            raise QueueEmptyError()
124
125        item = self._queue.popleft()
126        self._wakeup_next(self._putters)
127        return item
128
129    def put_nowait(self, item: T):
130        """Put an item into the queue without blocking.
131
132        If no free slot is immediately available, raise `QueueFullError`.
133
134        Raises:
135            QueueFullError
136
137        """
138        if self._closed:
139            raise QueueClosedError()
140
141        if self.full():
142            raise QueueFullError()
143
144        self._queue.append(item)
145        self._wakeup_next(self._getters)
146
147    async def get(self) -> T:
148        """Remove and return an item from the queue.
149
150        If queue is empty, wait until an item is available.
151
152        Raises:
153            QueueClosedError
154
155        """
156        loop = asyncio.get_running_loop()
157
158        while self.empty():
159            if self._closed:
160                self._wakeup_all(self._getters)
161                raise QueueClosedError()
162
163            getter = loop.create_future()
164            self._getters.append(getter)
165
166            try:
167                await getter
168
169            except BaseException:
170                getter.cancel()
171
172                with contextlib.suppress(ValueError):
173                    self._getters.remove(getter)
174
175                if not getter.cancelled():
176                    if not self.empty() or self._closed:
177                        self._wakeup_next(self._getters)
178
179                raise
180
181        return self.get_nowait()
182
183    async def put(self, item: T):
184        """Put an item into the queue.
185
186        If the queue is full, wait until a free slot is available before adding
187        the item.
188
189        Raises:
190            QueueClosedError
191
192        """
193        loop = asyncio.get_running_loop()
194
195        while not self._closed and self.full():
196            putter = loop.create_future()
197            self._putters.append(putter)
198
199            try:
200                await putter
201
202            except BaseException:
203                putter.cancel()
204
205                with contextlib.suppress(ValueError):
206                    self._putters.remove(putter)
207
208                if not self.full() and not putter.cancelled():
209                    self._wakeup_next(self._putters)
210
211                raise
212
213        return self.put_nowait(item)
214
215    async def get_until_empty(self) -> T:
216        """Empty the queue and return the last item.
217
218        If queue is empty, wait until at least one item is available.
219
220        Raises:
221            QueueClosedError
222
223        """
224        item = await self.get()
225
226        while not self.empty():
227            item = self.get_nowait()
228
229        return item
230
231    def get_nowait_until_empty(self) -> T:
232        """Empty the queue and return the last item if at least one
233        item is immediately available, else raise `QueueEmptyError`.
234
235        Raises:
236            QueueEmptyError
237
238        """
239        item = self.get_nowait()
240
241        while not self.empty():
242            item = self.get_nowait()
243
244        return item
245
246    def _wakeup_next(self, waiters):
247        while waiters:
248            waiter = waiters.popleft()
249
250            if not waiter.done():
251                waiter.set_result(None)
252                break
253
254    def _wakeup_all(self, waiters):
255        while waiters:
256            waiter = waiters.popleft()
257
258            if not waiter.done():
259                waiter.set_result(None)

Asyncio queue which implements AsyncIterable and can be closed.

Interface and implementation are based on asyncio.Queue.

If maxsize is less than or equal to zero, the queue size is infinite.

Arguments:
  • maxsize: maximum number of items in the queue

Example::

queue = Queue(maxsize=1)

async def producer():
    for i in range(4):
        await queue.put(i)
    queue.close()

async def consumer():
    result = 0
    async for i in queue:
        result += i
    return result

asyncio.ensure_future(producer())
result = await consumer()
assert result == 6
Queue(maxsize: int = 0)
54    def __init__(self,
55                 maxsize: int = 0):
56        self._maxsize = maxsize
57        self._queue = collections.deque()
58        self._getters = collections.deque()
59        self._putters = collections.deque()
60        self._closed = False
maxsize: int
80    @property
81    def maxsize(self) -> int:
82        """Maximum number of items in the queue."""
83        return self._maxsize

Maximum number of items in the queue.

is_closed: bool
85    @property
86    def is_closed(self) -> bool:
87        """Is queue closed."""
88        return self._closed

Is queue closed.

def empty(self) -> bool:
90    def empty(self) -> bool:
91        """``True`` if queue is empty, ``False`` otherwise."""
92        return not self._queue

True if queue is empty, False otherwise.

def full(self) -> bool:
94    def full(self) -> bool:
95        """``True`` if queue is full, ``False`` otherwise."""
96        if self._maxsize > 0:
97            return len(self._queue) >= self._maxsize
98
99        return False

True if queue is full, False otherwise.

def qsize(self) -> int:
101    def qsize(self) -> int:
102        """Number of items currently in the queue."""
103        return len(self._queue)

Number of items currently in the queue.

def close(self):
105    def close(self):
106        """Close the queue."""
107        if self._closed:
108            return
109
110        self._closed = True
111        self._wakeup_all(self._putters)
112        self._wakeup_next(self._getters)

Close the queue.

def get_nowait(self) -> ~T:
114    def get_nowait(self) -> T:
115        """Return an item if one is immediately available, else raise
116        `QueueEmptyError`.
117
118        Raises:
119            QueueEmptyError
120
121        """
122        if self.empty():
123            raise QueueEmptyError()
124
125        item = self._queue.popleft()
126        self._wakeup_next(self._putters)
127        return item

Return an item if one is immediately available, else raise QueueEmptyError.

Raises:
  • QueueEmptyError
def put_nowait(self, item: ~T):
129    def put_nowait(self, item: T):
130        """Put an item into the queue without blocking.
131
132        If no free slot is immediately available, raise `QueueFullError`.
133
134        Raises:
135            QueueFullError
136
137        """
138        if self._closed:
139            raise QueueClosedError()
140
141        if self.full():
142            raise QueueFullError()
143
144        self._queue.append(item)
145        self._wakeup_next(self._getters)

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFullError.

Raises:
  • QueueFullError
async def get(self) -> ~T:
147    async def get(self) -> T:
148        """Remove and return an item from the queue.
149
150        If queue is empty, wait until an item is available.
151
152        Raises:
153            QueueClosedError
154
155        """
156        loop = asyncio.get_running_loop()
157
158        while self.empty():
159            if self._closed:
160                self._wakeup_all(self._getters)
161                raise QueueClosedError()
162
163            getter = loop.create_future()
164            self._getters.append(getter)
165
166            try:
167                await getter
168
169            except BaseException:
170                getter.cancel()
171
172                with contextlib.suppress(ValueError):
173                    self._getters.remove(getter)
174
175                if not getter.cancelled():
176                    if not self.empty() or self._closed:
177                        self._wakeup_next(self._getters)
178
179                raise
180
181        return self.get_nowait()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Raises:
  • QueueClosedError
async def put(self, item: ~T):
183    async def put(self, item: T):
184        """Put an item into the queue.
185
186        If the queue is full, wait until a free slot is available before adding
187        the item.
188
189        Raises:
190            QueueClosedError
191
192        """
193        loop = asyncio.get_running_loop()
194
195        while not self._closed and self.full():
196            putter = loop.create_future()
197            self._putters.append(putter)
198
199            try:
200                await putter
201
202            except BaseException:
203                putter.cancel()
204
205                with contextlib.suppress(ValueError):
206                    self._putters.remove(putter)
207
208                if not self.full() and not putter.cancelled():
209                    self._wakeup_next(self._putters)
210
211                raise
212
213        return self.put_nowait(item)

Put an item into the queue.

If the queue is full, wait until a free slot is available before adding the item.

Raises:
  • QueueClosedError
async def get_until_empty(self) -> ~T:
215    async def get_until_empty(self) -> T:
216        """Empty the queue and return the last item.
217
218        If queue is empty, wait until at least one item is available.
219
220        Raises:
221            QueueClosedError
222
223        """
224        item = await self.get()
225
226        while not self.empty():
227            item = self.get_nowait()
228
229        return item

Empty the queue and return the last item.

If queue is empty, wait until at least one item is available.

Raises:
  • QueueClosedError
def get_nowait_until_empty(self) -> ~T:
231    def get_nowait_until_empty(self) -> T:
232        """Empty the queue and return the last item if at least one
233        item is immediately available, else raise `QueueEmptyError`.
234
235        Raises:
236            QueueEmptyError
237
238        """
239        item = self.get_nowait()
240
241        while not self.empty():
242            item = self.get_nowait()
243
244        return item

Empty the queue and return the last item if at least one item is immediately available, else raise QueueEmptyError.

Raises:
  • QueueEmptyError
class CancelledWithResultError(asyncio.exceptions.CancelledError):
13class CancelledWithResultError(asyncio.CancelledError):
14    """CancelledError with associated result or exception"""
15
16    def __init__(self,
17                 result: typing.Any | None,
18                 exception: BaseException | None):
19        super().__init__()
20        self.__result = result
21        self.__exception = exception
22
23    @property
24    def result(self) -> typing.Any | None:
25        """Result"""
26        return self.__result
27
28    @property
29    def exception(self) -> BaseException | None:
30        return self.__exception

CancelledError with associated result or exception

CancelledWithResultError(result: Optional[Any], exception: BaseException | None)
16    def __init__(self,
17                 result: typing.Any | None,
18                 exception: BaseException | None):
19        super().__init__()
20        self.__result = result
21        self.__exception = exception
result: Optional[Any]
23    @property
24    def result(self) -> typing.Any | None:
25        """Result"""
26        return self.__result

Result

exception: BaseException | None
28    @property
29    def exception(self) -> BaseException | None:
30        return self.__exception
async def wait_for(obj: Awaitable[~T], timeout: float) -> ~T:
33async def wait_for(obj: Awaitable[T],
34                   timeout: float
35                   ) -> T:
36    """"Wait for the awaitable object to complete, with timeout.
37
38    Implementation `asyncio.wait_for` that ensure propagation of
39    CancelledError.
40
41    If task is cancelled with objects's result available, instead of
42    returning result, this implementation raises `CancelledWithResultError`.
43
44    """
45    group = Group(log_exceptions=False)
46    group.spawn(call_on_done, asyncio.sleep(timeout), group.close)
47
48    task = group.wrap(obj)
49    group.spawn(call_on_done, asyncio.shield(task), group.close)
50
51    exc = None
52
53    try:
54        await group.wait_closing()
55
56    except asyncio.CancelledError as e:
57        exc = e
58
59    try:
60        await uncancellable(group.async_close())
61
62    except asyncio.CancelledError as e:
63        exc = e
64
65    if exc:
66        if task.cancelled():
67            raise exc
68
69        result = None if task.exception() else task.result()
70        raise CancelledWithResultError(result, task.exception()) from exc
71
72    if task.cancelled():
73        raise asyncio.TimeoutError()
74
75    return task.result()

"Wait for the awaitable object to complete, with timeout.

Implementation asyncio.wait_for that ensure propagation of CancelledError.

If task is cancelled with objects's result available, instead of returning result, this implementation raises CancelledWithResultError.