hat.aio

Async utility functions

 1"""Async utility functions"""
 2
 3from hat.aio.executor import (Executor,
 4                              create_executor)
 5from hat.aio.group import (Resource,
 6                           Group)
 7from hat.aio.misc import (first,
 8                          uncancellable,
 9                          AsyncCallable,
10                          call,
11                          call_on_cancel,
12                          call_on_done,
13                          init_asyncio,
14                          run_asyncio)
15from hat.aio.queue import (QueueClosedError,
16                           QueueEmptyError,
17                           QueueFullError,
18                           Queue)
19from hat.aio.wait import (CancelledWithResultError,
20                          wait_for)
21
22
23__all__ = ['Executor',
24           'create_executor',
25           'Resource',
26           'Group',
27           'first',
28           'uncancellable',
29           'AsyncCallable',
30           'call',
31           'call_on_cancel',
32           'call_on_done',
33           'init_asyncio',
34           'run_asyncio',
35           'QueueClosedError',
36           'QueueEmptyError',
37           'QueueFullError',
38           'Queue',
39           'CancelledWithResultError',
40           'wait_for']
class Executor(hat.aio.Resource):
11class Executor(Resource):
12    """Executor wrapping `asyncio.loop.run_in_executor`.
13
14    Wrapped executor is created from `executor_cls` with provided `args`.
15
16    If `wait_futures` is ``True``, executor will be closed once all running
17    tasks finishes.
18
19    `log_exceptions` is delegated to `async_group`.
20
21    Args:
22        args: executor args
23        executor_cls: executor class
24        log_exceptions: log exceptions
25
26    Example::
27
28        executor1 = Executor()
29        executor2 = Executor()
30        tid1 = await executor1.spawn(threading.get_ident)
31        tid2 = await executor2.spawn(threading.get_ident)
32        assert tid1 != tid2
33
34    """
35
36    def __init__(self,
37                 *args,
38                 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor,  # NOQA
39                 log_exceptions: bool = True,
40                 wait_futures: bool = True):
41        self._wait_futures = wait_futures
42        self._executor = executor_cls(*args)
43        self._loop = asyncio.get_running_loop()
44        self._async_group = Group(log_exceptions)
45
46        self.async_group.spawn(call_on_cancel, self._executor.shutdown, False)
47
48    @property
49    def async_group(self):
50        """Async group"""
51        return self._async_group
52
53    def spawn(self,
54              fn: Callable,
55              *args, **kwargs
56              ) -> asyncio.Task:
57        """Spawn new task"""
58        return self.async_group.spawn(self._spawn, fn, args, kwargs)
59
60    async def _spawn(self, fn, args, kwargs):
61        func = functools.partial(fn, *args, **kwargs)
62        coro = self._loop.run_in_executor(self._executor, func)
63
64        if self._wait_futures:
65            coro = uncancellable(coro)
66
67        return await coro

Executor wrapping asyncio.loop.run_in_executor.

Wrapped executor is created from executor_cls with provided args.

If wait_futures is True, executor will be closed once all running tasks finishes.

log_exceptions is delegated to async_group.

Arguments:
  • args: executor args
  • executor_cls: executor class
  • log_exceptions: log exceptions

Example::

executor1 = Executor()
executor2 = Executor()
tid1 = await executor1.spawn(threading.get_ident)
tid2 = await executor2.spawn(threading.get_ident)
assert tid1 != tid2
Executor( *args, executor_cls: type[concurrent.futures._base.Executor] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>, log_exceptions: bool = True, wait_futures: bool = True)
36    def __init__(self,
37                 *args,
38                 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor,  # NOQA
39                 log_exceptions: bool = True,
40                 wait_futures: bool = True):
41        self._wait_futures = wait_futures
42        self._executor = executor_cls(*args)
43        self._loop = asyncio.get_running_loop()
44        self._async_group = Group(log_exceptions)
45
46        self.async_group.spawn(call_on_cancel, self._executor.shutdown, False)
async_group
48    @property
49    def async_group(self):
50        """Async group"""
51        return self._async_group

Async group

def spawn(self, fn: Callable, *args, **kwargs) -> _asyncio.Task:
53    def spawn(self,
54              fn: Callable,
55              *args, **kwargs
56              ) -> asyncio.Task:
57        """Spawn new task"""
58        return self.async_group.spawn(self._spawn, fn, args, kwargs)

Spawn new task

def create_executor( *args, executor_cls: type[concurrent.futures._base.Executor] = <class 'concurrent.futures.thread.ThreadPoolExecutor'>, loop: asyncio.events.AbstractEventLoop | None = None) -> Callable[..., Awaitable]:
 70def create_executor(*args,
 71                    executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor,  # NOQA
 72                    loop: asyncio.AbstractEventLoop | None = None
 73                    ) -> Callable[..., Awaitable]:
 74    """Create `asyncio.loop.run_in_executor` wrapper.
 75
 76    Wrapped executor is created from `executor_cls` with provided `args`.
 77
 78    This function returns coroutine that takes a function and its arguments,
 79    executes the function in executor and returns the result.
 80
 81    Args:
 82        args: executor args
 83        executor_cls: executor class
 84        loop: asyncio loop
 85
 86    Returns:
 87        executor coroutine
 88
 89    Example::
 90
 91        executor1 = create_executor()
 92        executor2 = create_executor()
 93        tid1 = await executor1(threading.get_ident)
 94        tid2 = await executor2(threading.get_ident)
 95        assert tid1 != tid2
 96
 97    """
 98    executor = executor_cls(*args)
 99
100    async def executor_wrapper(func, /, *args, **kwargs):
101        _loop = loop or asyncio.get_running_loop()
102        fn = functools.partial(func, *args, **kwargs)
103        return await _loop.run_in_executor(executor, fn)
104
105    return executor_wrapper

Create asyncio.loop.run_in_executor wrapper.

Wrapped executor is created from executor_cls with provided args.

This function returns coroutine that takes a function and its arguments, executes the function in executor and returns the result.

Arguments:
  • args: executor args
  • executor_cls: executor class
  • loop: asyncio loop
Returns:

executor coroutine

Example::

executor1 = create_executor()
executor2 = create_executor()
tid1 = await executor1(threading.get_ident)
tid2 = await executor2(threading.get_ident)
assert tid1 != tid2
class Resource(abc.ABC):
13class Resource(abc.ABC):
14    """Resource with lifetime control based on `Group`."""
15
16    async def __aenter__(self):
17        return self
18
19    async def __aexit__(self, *args):
20        await self.async_close()
21
22    @property
23    @abc.abstractmethod
24    def async_group(self) -> 'Group':
25        """Group controlling resource's lifetime."""
26
27    @property
28    def is_open(self) -> bool:
29        """``True`` if not closing or closed, ``False`` otherwise."""
30        return self.async_group.is_open
31
32    @property
33    def is_closing(self) -> bool:
34        """Is resource closing or closed."""
35        return self.async_group.is_closing
36
37    @property
38    def is_closed(self) -> bool:
39        """Is resource closed."""
40        return self.async_group.is_closed
41
42    async def wait_closing(self):
43        """Wait until closing is ``True``."""
44        await self.async_group.wait_closing()
45
46    async def wait_closed(self):
47        """Wait until closed is ``True``."""
48        await self.async_group.wait_closed()
49
50    def close(self):
51        """Close resource."""
52        self.async_group.close()
53
54    async def async_close(self):
55        """Close resource and wait until closed is ``True``."""
56        await self.async_group.async_close()

Resource with lifetime control based on Group.

async_group: Group
22    @property
23    @abc.abstractmethod
24    def async_group(self) -> 'Group':
25        """Group controlling resource's lifetime."""

Group controlling resource's lifetime.

is_open: bool
27    @property
28    def is_open(self) -> bool:
29        """``True`` if not closing or closed, ``False`` otherwise."""
30        return self.async_group.is_open

True if not closing or closed, False otherwise.

is_closing: bool
32    @property
33    def is_closing(self) -> bool:
34        """Is resource closing or closed."""
35        return self.async_group.is_closing

Is resource closing or closed.

is_closed: bool
37    @property
38    def is_closed(self) -> bool:
39        """Is resource closed."""
40        return self.async_group.is_closed

Is resource closed.

async def wait_closing(self):
42    async def wait_closing(self):
43        """Wait until closing is ``True``."""
44        await self.async_group.wait_closing()

Wait until closing is True.

async def wait_closed(self):
46    async def wait_closed(self):
47        """Wait until closed is ``True``."""
48        await self.async_group.wait_closed()

Wait until closed is True.

def close(self):
50    def close(self):
51        """Close resource."""
52        self.async_group.close()

Close resource.

async def async_close(self):
54    async def async_close(self):
55        """Close resource and wait until closed is ``True``."""
56        await self.async_group.async_close()

Close resource and wait until closed is True.

class Group(hat.aio.Resource):
 63class Group(Resource):
 64    """Group of asyncio Tasks.
 65
 66    Group enables creation and management of related asyncio Tasks. The
 67    Group ensures uninterrupted execution of Tasks and Task completion upon
 68    Group closing.
 69
 70    Group can contain subgroups, which are independent Groups managed by the
 71    parent Group.
 72
 73    If a Task raises exception, other Tasks continue to execute.
 74
 75    If `log_exceptions` is ``True``, exceptions raised by spawned tasks are
 76    logged with level ERROR.
 77
 78    """
 79
 80    def __init__(self,
 81                 log_exceptions: bool = True,
 82                 *,
 83                 loop: asyncio.AbstractEventLoop | None = None):
 84        self._log_exceptions = log_exceptions
 85        self._loop = loop or asyncio.get_running_loop()
 86        self._closing = self._loop.create_future()
 87        self._closed = self._loop.create_future()
 88        self._tasks = set()
 89        self._parent = None
 90        self._children = set()
 91
 92    @property
 93    def async_group(self):
 94        """Async group"""
 95        return self
 96
 97    @property
 98    def is_open(self) -> bool:
 99        """``True`` if group is not closing or closed, ``False`` otherwise."""
100        return not self._closing.done()
101
102    @property
103    def is_closing(self) -> bool:
104        """Is group closing or closed."""
105        return self._closing.done()
106
107    @property
108    def is_closed(self) -> bool:
109        """Is group closed."""
110        return self._closed.done()
111
112    async def wait_closing(self):
113        """Wait until closing is ``True``."""
114        await asyncio.shield(self._closing)
115
116    async def wait_closed(self):
117        """Wait until closed is ``True``."""
118        await asyncio.shield(self._closed)
119
120    def create_subgroup(self,
121                        log_exceptions: bool | None = None
122                        ) -> 'Group':
123        """Create new Group as a child of this Group. Return the new Group.
124
125        When a parent Group gets closed, all of its children are closed.
126        Closing of a subgroup has no effect on the parent Group.
127
128        If `log_exceptions` is ``None``, subgroup inherits `log_exceptions`
129        from its parent.
130
131        """
132        if self._closing.done():
133            raise GroupClosedError("can't create subgroup of closed group")
134
135        child = Group(
136            log_exceptions=(self._log_exceptions if log_exceptions is None
137                            else log_exceptions),
138            loop=self._loop)
139        child._parent = self
140        self._children.add(child)
141        return child
142
143    def wrap(self,
144             obj: Awaitable
145             ) -> asyncio.Task:
146        """Wrap the awaitable object into a Task and schedule its execution.
147        Return the Task object.
148
149        Resulting task is shielded and can be canceled only with
150        `Group.close`.
151
152        """
153        if self._closing.done():
154            raise GroupClosedError("can't wrap object in closed group")
155
156        if asyncio.iscoroutine(obj):
157            task = self._loop.create_task(obj)
158
159        else:
160            task = asyncio.ensure_future(obj, loop=self._loop)
161
162        self._tasks.add(task)
163        task.add_done_callback(self._on_task_done)
164
165        return asyncio.shield(task)
166
167    def spawn(self,
168              fn: Callable[..., Awaitable],
169              *args, **kwargs
170              ) -> asyncio.Task:
171        """Wrap the result of a `fn` into a Task and schedule its execution.
172        Return the Task object.
173
174        Function `fn` is called with provided `args` and `kwargs`.
175        Resulting Task is shielded and can be canceled only with
176        `Group.close`.
177
178        """
179        if self._closing.done():
180            raise GroupClosedError("can't spawn task in closed group")
181
182        future = fn(*args, **kwargs)
183        return self.wrap(future)
184
185    def close(self):
186        """Schedule Group closing.
187
188        Closing Future is set immediately. All subgroups are closed, and all
189        running tasks are canceled. Once closing of all subgroups
190        and execution of all tasks is completed, closed Future is set.
191
192        """
193        if self._closing.done():
194            return
195
196        self._closing.set_result(True)
197
198        for child in list(self._children):
199            child.close()
200
201        for task in self._tasks:
202            self._loop.call_soon(task.cancel)
203
204        futures = [*self._tasks,
205                   *(child._closed for child in self._children)]
206        if futures:
207            waiting_task = self._loop.create_task(asyncio.wait(futures))
208            waiting_task.add_done_callback(lambda _: self._on_closed())
209
210        else:
211            self._on_closed()
212
213    async def async_close(self):
214        """Close Group and wait until closed is ``True``."""
215        self.close()
216        await self.wait_closed()
217
218    async def __aenter__(self):
219        return self
220
221    async def __aexit__(self, *args):
222        await self.async_close()
223
224    def _on_closed(self):
225        if self._parent is not None:
226            self._parent._children.remove(self)
227            self._parent = None
228
229        self._closed.set_result(True)
230
231    def _on_task_done(self, task):
232        self._tasks.remove(task)
233
234        if task.cancelled():
235            return
236
237        e = task.exception()
238        if e and self._log_exceptions:
239            mlog.error('unhandled exception in async group: %s', e, exc_info=e)
240            warnings.warn('unhandled exception in async group')

Group of asyncio Tasks.

Group enables creation and management of related asyncio Tasks. The Group ensures uninterrupted execution of Tasks and Task completion upon Group closing.

Group can contain subgroups, which are independent Groups managed by the parent Group.

If a Task raises exception, other Tasks continue to execute.

If log_exceptions is True, exceptions raised by spawned tasks are logged with level ERROR.

Group( log_exceptions: bool = True, *, loop: asyncio.events.AbstractEventLoop | None = None)
80    def __init__(self,
81                 log_exceptions: bool = True,
82                 *,
83                 loop: asyncio.AbstractEventLoop | None = None):
84        self._log_exceptions = log_exceptions
85        self._loop = loop or asyncio.get_running_loop()
86        self._closing = self._loop.create_future()
87        self._closed = self._loop.create_future()
88        self._tasks = set()
89        self._parent = None
90        self._children = set()
async_group
92    @property
93    def async_group(self):
94        """Async group"""
95        return self

Async group

is_open: bool
 97    @property
 98    def is_open(self) -> bool:
 99        """``True`` if group is not closing or closed, ``False`` otherwise."""
100        return not self._closing.done()

True if group is not closing or closed, False otherwise.

is_closing: bool
102    @property
103    def is_closing(self) -> bool:
104        """Is group closing or closed."""
105        return self._closing.done()

Is group closing or closed.

is_closed: bool
107    @property
108    def is_closed(self) -> bool:
109        """Is group closed."""
110        return self._closed.done()

Is group closed.

async def wait_closing(self):
112    async def wait_closing(self):
113        """Wait until closing is ``True``."""
114        await asyncio.shield(self._closing)

Wait until closing is True.

async def wait_closed(self):
116    async def wait_closed(self):
117        """Wait until closed is ``True``."""
118        await asyncio.shield(self._closed)

Wait until closed is True.

def create_subgroup(self, log_exceptions: bool | None = None) -> Group:
120    def create_subgroup(self,
121                        log_exceptions: bool | None = None
122                        ) -> 'Group':
123        """Create new Group as a child of this Group. Return the new Group.
124
125        When a parent Group gets closed, all of its children are closed.
126        Closing of a subgroup has no effect on the parent Group.
127
128        If `log_exceptions` is ``None``, subgroup inherits `log_exceptions`
129        from its parent.
130
131        """
132        if self._closing.done():
133            raise GroupClosedError("can't create subgroup of closed group")
134
135        child = Group(
136            log_exceptions=(self._log_exceptions if log_exceptions is None
137                            else log_exceptions),
138            loop=self._loop)
139        child._parent = self
140        self._children.add(child)
141        return child

Create new Group as a child of this Group. Return the new Group.

When a parent Group gets closed, all of its children are closed. Closing of a subgroup has no effect on the parent Group.

If log_exceptions is None, subgroup inherits log_exceptions from its parent.

def wrap(self, obj: Awaitable) -> _asyncio.Task:
143    def wrap(self,
144             obj: Awaitable
145             ) -> asyncio.Task:
146        """Wrap the awaitable object into a Task and schedule its execution.
147        Return the Task object.
148
149        Resulting task is shielded and can be canceled only with
150        `Group.close`.
151
152        """
153        if self._closing.done():
154            raise GroupClosedError("can't wrap object in closed group")
155
156        if asyncio.iscoroutine(obj):
157            task = self._loop.create_task(obj)
158
159        else:
160            task = asyncio.ensure_future(obj, loop=self._loop)
161
162        self._tasks.add(task)
163        task.add_done_callback(self._on_task_done)
164
165        return asyncio.shield(task)

Wrap the awaitable object into a Task and schedule its execution. Return the Task object.

Resulting task is shielded and can be canceled only with Group.close.

def spawn(self, fn: Callable[..., Awaitable], *args, **kwargs) -> _asyncio.Task:
167    def spawn(self,
168              fn: Callable[..., Awaitable],
169              *args, **kwargs
170              ) -> asyncio.Task:
171        """Wrap the result of a `fn` into a Task and schedule its execution.
172        Return the Task object.
173
174        Function `fn` is called with provided `args` and `kwargs`.
175        Resulting Task is shielded and can be canceled only with
176        `Group.close`.
177
178        """
179        if self._closing.done():
180            raise GroupClosedError("can't spawn task in closed group")
181
182        future = fn(*args, **kwargs)
183        return self.wrap(future)

Wrap the result of a fn into a Task and schedule its execution. Return the Task object.

Function fn is called with provided args and kwargs. Resulting Task is shielded and can be canceled only with Group.close.

def close(self):
185    def close(self):
186        """Schedule Group closing.
187
188        Closing Future is set immediately. All subgroups are closed, and all
189        running tasks are canceled. Once closing of all subgroups
190        and execution of all tasks is completed, closed Future is set.
191
192        """
193        if self._closing.done():
194            return
195
196        self._closing.set_result(True)
197
198        for child in list(self._children):
199            child.close()
200
201        for task in self._tasks:
202            self._loop.call_soon(task.cancel)
203
204        futures = [*self._tasks,
205                   *(child._closed for child in self._children)]
206        if futures:
207            waiting_task = self._loop.create_task(asyncio.wait(futures))
208            waiting_task.add_done_callback(lambda _: self._on_closed())
209
210        else:
211            self._on_closed()

Schedule Group closing.

Closing Future is set immediately. All subgroups are closed, and all running tasks are canceled. Once closing of all subgroups and execution of all tasks is completed, closed Future is set.

async def async_close(self):
213    async def async_close(self):
214        """Close Group and wait until closed is ``True``."""
215        self.close()
216        await self.wait_closed()

Close Group and wait until closed is True.

async def first( xs: AsyncIterable[~T], fn: Callable[[~T], typing.Any] = <function <lambda>>, default: Optional[~T] = None) -> Optional[~T]:
14async def first(xs: AsyncIterable[T],
15                fn: Callable[[T], typing.Any] = lambda _: True,
16                default: T | None = None
17                ) -> T | None:
18    """Return the first element from async iterable that satisfies
19    predicate `fn`, or `default` if no such element exists.
20
21    Result of predicate `fn` can be of any type. Predicate is satisfied if it's
22    return value is truthy.
23
24    Args:
25        xs: async collection
26        fn: predicate
27        default: default value
28
29    Example::
30
31        async def async_range(x):
32            for i in range(x):
33                await asyncio.sleep(0)
34                yield i
35
36        assert await first(async_range(3)) == 0
37        assert await first(async_range(3), lambda x: x > 1) == 2
38        assert await first(async_range(3), lambda x: x > 2) is None
39        assert await first(async_range(3), lambda x: x > 2, 123) == 123
40
41    """
42    async for i in xs:
43        if fn(i):
44            return i
45
46    return default

Return the first element from async iterable that satisfies predicate fn, or default if no such element exists.

Result of predicate fn can be of any type. Predicate is satisfied if it's return value is truthy.

Arguments:
  • xs: async collection
  • fn: predicate
  • default: default value

Example::

async def async_range(x):
    for i in range(x):
        await asyncio.sleep(0)
        yield i

assert await first(async_range(3)) == 0
assert await first(async_range(3), lambda x: x > 1) == 2
assert await first(async_range(3), lambda x: x > 2) is None
assert await first(async_range(3), lambda x: x > 2, 123) == 123
async def uncancellable(obj: Awaitable[~T], raise_cancel: bool = True) -> ~T:
49async def uncancellable(obj: Awaitable[T],
50                        raise_cancel: bool = True
51                        ) -> T:
52    """Uncancellable execution of a awaitable object.
53
54    Object is scheduled as task, shielded and its execution cannot be
55    interrupted.
56
57    If `raise_cancel` is `True` and the object gets canceled,
58    `asyncio.CancelledError` is reraised after the Future finishes.
59
60    Warning:
61        If `raise_cancel` is `False`, this method suppresses
62        `asyncio.CancelledError` and stops its propagation. Use with
63        caution.
64
65    Args:
66        obj: awaitable object
67        raise_cancel: raise CancelledError flag
68
69    Returns:
70        object execution result
71
72    """
73    exception = None
74    loop = asyncio.get_running_loop()
75
76    if asyncio.iscoroutine(obj):
77        task = loop.create_task(obj)
78
79    else:
80        task = asyncio.ensure_future(obj, loop=loop)
81
82    while not task.done():
83        try:
84            await asyncio.shield(task)
85
86        except asyncio.CancelledError as e:
87            if raise_cancel:
88                exception = e
89
90        except Exception:
91            pass
92
93    if exception:
94        raise exception
95
96    return task.result()

Uncancellable execution of a awaitable object.

Object is scheduled as task, shielded and its execution cannot be interrupted.

If raise_cancel is True and the object gets canceled, asyncio.CancelledError is reraised after the Future finishes.

Warning:

If raise_cancel is False, this method suppresses asyncio.CancelledError and stops its propagation. Use with caution.

Arguments:
  • obj: awaitable object
  • raise_cancel: raise CancelledError flag
Returns:

object execution result

AsyncCallable = typing.Callable
async def call(fn: Callable[..., Union[~T, Awaitable[~T]]], *args, **kwargs) -> ~T:
113async def call(fn: AsyncCallable[..., T], *args, **kwargs) -> T:
114    """Call a function or a coroutine (or other callable object).
115
116    Call the `fn` with `args` and `kwargs`. If result of this call is
117    awaitable, it is awaited and returned. Otherwise, result is immediately
118    returned.
119
120    Args:
121        fn: callable object
122        args: additional positional arguments
123        kwargs: additional keyword arguments
124
125    Returns:
126        awaited result or result
127
128    Example:
129
130        def f1(x):
131            return x
132
133        def f2(x):
134            f = asyncio.Future()
135            f.set_result(x)
136            return f
137
138        async def f3(x):
139            return x
140
141        assert 'f1' == await hat.aio.call(f1, 'f1')
142        assert 'f2' == await hat.aio.call(f2, 'f2')
143        assert 'f3' == await hat.aio.call(f3, 'f3')
144
145    """
146    result = fn(*args, **kwargs)
147
148    if inspect.isawaitable(result):
149        result = await result
150
151    return result

Call a function or a coroutine (or other callable object).

Call the fn with args and kwargs. If result of this call is awaitable, it is awaited and returned. Otherwise, result is immediately returned.

Arguments:
  • fn: callable object
  • args: additional positional arguments
  • kwargs: additional keyword arguments
Returns:

awaited result or result

Example:

def f1(x): return x

def f2(x): f = asyncio.Future() f.set_result(x) return f

async def f3(x): return x

assert 'f1' == await hat.aio.call(f1, 'f1') assert 'f2' == await hat.aio.call(f2, 'f2') assert 'f3' == await hat.aio.call(f3, 'f3')

async def call_on_cancel(fn: Callable[..., Union[~T, Awaitable[~T]]], *args, **kwargs) -> ~T:
154async def call_on_cancel(fn: AsyncCallable[..., T], *args, **kwargs) -> T:
155    """Call a function or a coroutine when canceled.
156
157    When canceled, `fn` is called with `args` and `kwargs` by using
158    `call` coroutine.
159
160    Args:
161        fn: function or coroutine
162        args: additional function arguments
163        kwargs: additional function keyword arguments
164
165    Returns:
166        function result
167
168    Example::
169
170        f = asyncio.Future()
171        group = Group()
172        group.spawn(call_on_cancel, f.set_result, 123)
173        assert not f.done()
174        await group.async_close()
175        assert f.result() == 123
176
177    """
178    with contextlib.suppress(asyncio.CancelledError):
179        await asyncio.get_running_loop().create_future()
180
181    return await call(fn, *args, *kwargs)

Call a function or a coroutine when canceled.

When canceled, fn is called with args and kwargs by using call coroutine.

Arguments:
  • fn: function or coroutine
  • args: additional function arguments
  • kwargs: additional function keyword arguments
Returns:

function result

Example::

f = asyncio.Future()
group = Group()
group.spawn(call_on_cancel, f.set_result, 123)
assert not f.done()
await group.async_close()
assert f.result() == 123
async def call_on_done( f: Awaitable, fn: Callable[..., Union[~T, Awaitable[~T]]], *args, **kwargs) -> ~T:
184async def call_on_done(f: Awaitable,
185                       fn: AsyncCallable[..., T],
186                       *args, **kwargs
187                       ) -> T:
188    """Call a function or a coroutine when awaitable is done.
189
190    When `f` is done, `fn` is called with `args` and `kwargs` by using
191    `call` coroutine.
192
193    If this coroutine is canceled before `f` is done, `f` is canceled and `fn`
194    is not called.
195
196    If this coroutine is canceled after `f` is done, `fn` call is canceled.
197
198    Args:
199        f: awaitable future
200        fn: function or coroutine
201        args: additional function arguments
202        kwargs: additional function keyword arguments
203
204    Returns:
205        function result
206
207    Example::
208
209        f = asyncio.Future()
210        group = Group()
211        group.spawn(call_on_done, f, group.close)
212        assert group.is_open
213        f.set_result(None)
214        await group.wait_closed()
215        assert group.is_closed
216
217    """
218    with contextlib.suppress(Exception):
219        await f
220
221    return await call(fn, *args, *kwargs)

Call a function or a coroutine when awaitable is done.

When f is done, fn is called with args and kwargs by using call coroutine.

If this coroutine is canceled before f is done, f is canceled and fn is not called.

If this coroutine is canceled after f is done, fn call is canceled.

Arguments:
  • f: awaitable future
  • fn: function or coroutine
  • args: additional function arguments
  • kwargs: additional function keyword arguments
Returns:

function result

Example::

f = asyncio.Future()
group = Group()
group.spawn(call_on_done, f, group.close)
assert group.is_open
f.set_result(None)
await group.wait_closed()
assert group.is_closed
def init_asyncio(policy: asyncio.events.AbstractEventLoopPolicy | None = None):
224def init_asyncio(policy: asyncio.AbstractEventLoopPolicy | None = None):
225    """Initialize asyncio.
226
227    Sets event loop policy (if ``None``, instance of
228    `asyncio.DefaultEventLoopPolicy` is used).
229
230    On Windows, `asyncio.WindowsProactorEventLoopPolicy` is used as default
231    policy.
232
233    """
234
235    def get_default_policy():
236        if sys.platform == 'win32':
237            return asyncio.WindowsProactorEventLoopPolicy()
238
239        # TODO: evaluate usage of uvloop
240        # with contextlib.suppress(ModuleNotFoundError):
241        #     import uvloop
242        #     return uvloop.EventLoopPolicy()
243
244        return asyncio.DefaultEventLoopPolicy()
245
246    asyncio.set_event_loop_policy(policy or get_default_policy())

Initialize asyncio.

Sets event loop policy (if None, instance of asyncio.DefaultEventLoopPolicy is used).

On Windows, asyncio.WindowsProactorEventLoopPolicy is used as default policy.

def run_asyncio( future: Awaitable[~T], *, handle_signals: bool = True, loop: asyncio.events.AbstractEventLoop | None = None) -> ~T:
249def run_asyncio(future: Awaitable[T], *,
250                handle_signals: bool = True,
251                loop: asyncio.AbstractEventLoop | None = None
252                ) -> T:
253    """Run asyncio loop until the `future` is completed and return the result.
254
255    If `handle_signals` is ``True``, SIGINT and SIGTERM handlers are
256    temporarily overridden. Instead of raising ``KeyboardInterrupt`` on every
257    signal reception, Future is canceled only once. Additional signals are
258    ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also
259    overridden.
260
261    If `loop` is set to ``None``, new event loop is created and set
262    as thread's default event loop. Newly created loop is closed when this
263    coroutine returns. Running tasks or async generators, other than provided
264    `future`, are not canceled prior to loop closing.
265
266    On Windows, asyncio loop gets periodically woken up (every 0.5 seconds).
267
268    Args:
269        future: future or coroutine
270        handle_signals: handle signals flag
271        loop: event loop
272
273    Returns:
274        future's result
275
276    Example::
277
278        async def run():
279            await asyncio.sleep(0)
280            return 123
281
282        result = run_asyncio(run())
283        assert result == 123
284
285    """
286    close_loop = loop is None
287    if loop is None:
288        loop = asyncio.new_event_loop()
289        asyncio.set_event_loop(loop)
290
291    task = asyncio.ensure_future(future, loop=loop)
292
293    if sys.platform == 'win32':
294
295        async def task_wrapper(task):
296            try:
297                while not task.done():
298                    await asyncio.wait([task], timeout=0.5)
299            except asyncio.CancelledError:
300                task.cancel()
301            return await task
302
303        task = asyncio.ensure_future(task_wrapper(task), loop=loop)
304
305    if not handle_signals:
306        return loop.run_until_complete(task)
307
308    canceled = False
309    signalnums = [signal.SIGINT, signal.SIGTERM]
310    if sys.platform == 'win32':
311        signalnums += [signal.SIGBREAK]
312
313    def signal_handler(*args):
314        nonlocal canceled
315        if canceled:
316            return
317        loop.call_soon_threadsafe(task.cancel)
318        canceled = True
319
320    handlers = {signalnum: signal.getsignal(signalnum) or signal.SIG_DFL
321                for signalnum in signalnums}
322    for signalnum in signalnums:
323        signal.signal(signalnum, signal_handler)
324
325    try:
326        return loop.run_until_complete(task)
327
328    finally:
329        for signalnum, handler in handlers.items():
330            signal.signal(signalnum, handler)
331        if close_loop:
332            loop.close()

Run asyncio loop until the future is completed and return the result.

If handle_signals is True, SIGINT and SIGTERM handlers are temporarily overridden. Instead of raising KeyboardInterrupt on every signal reception, Future is canceled only once. Additional signals are ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also overridden.

If loop is set to None, new event loop is created and set as thread's default event loop. Newly created loop is closed when this coroutine returns. Running tasks or async generators, other than provided future, are not canceled prior to loop closing.

On Windows, asyncio loop gets periodically woken up (every 0.5 seconds).

Arguments:
  • future: future or coroutine
  • handle_signals: handle signals flag
  • loop: event loop
Returns:

future's result

Example::

async def run():
    await asyncio.sleep(0)
    return 123

result = run_asyncio(run())
assert result == 123
class QueueClosedError(builtins.Exception):
11class QueueClosedError(Exception):
12    """Raised when trying to use a closed queue."""

Raised when trying to use a closed queue.

class QueueEmptyError(builtins.Exception):
15class QueueEmptyError(Exception):
16    """Raised if queue is empty."""

Raised if queue is empty.

class QueueFullError(builtins.Exception):
19class QueueFullError(Exception):
20    """Raised if queue is full."""

Raised if queue is full.

class Queue(typing.Generic[~T]):
 23class Queue(typing.Generic[T]):
 24    """Asyncio queue which implements AsyncIterable and can be closed.
 25
 26    Interface and implementation are based on `asyncio.Queue`.
 27
 28    If `maxsize` is less than or equal to zero, the queue size is infinite.
 29
 30    Args:
 31        maxsize: maximum number of items in the queue
 32
 33    Example::
 34
 35        queue = Queue(maxsize=1)
 36
 37        async def producer():
 38            for i in range(4):
 39                await queue.put(i)
 40            queue.close()
 41
 42        async def consumer():
 43            result = 0
 44            async for i in queue:
 45                result += i
 46            return result
 47
 48        asyncio.ensure_future(producer())
 49        result = await consumer()
 50        assert result == 6
 51
 52    """
 53
 54    def __init__(self,
 55                 maxsize: int = 0):
 56        self._maxsize = maxsize
 57        self._queue = collections.deque()
 58        self._getters = collections.deque()
 59        self._putters = collections.deque()
 60        self._closed = False
 61
 62    def __aiter__(self):
 63        return self
 64
 65    async def __anext__(self):
 66        try:
 67            return await self.get()
 68
 69        except QueueClosedError:
 70            raise StopAsyncIteration
 71
 72    def __str__(self):
 73        return (f'<{type(self).__name__}'
 74                f' _closed={self._closed} '
 75                f' _queue={list(self._queue)}>')
 76
 77    def __len__(self):
 78        return len(self._queue)
 79
 80    @property
 81    def maxsize(self) -> int:
 82        """Maximum number of items in the queue."""
 83        return self._maxsize
 84
 85    @property
 86    def is_closed(self) -> bool:
 87        """Is queue closed."""
 88        return self._closed
 89
 90    def empty(self) -> bool:
 91        """``True`` if queue is empty, ``False`` otherwise."""
 92        return not self._queue
 93
 94    def full(self) -> bool:
 95        """``True`` if queue is full, ``False`` otherwise."""
 96        if self._maxsize > 0:
 97            return len(self._queue) >= self._maxsize
 98
 99        return False
100
101    def qsize(self) -> int:
102        """Number of items currently in the queue."""
103        return len(self._queue)
104
105    def close(self):
106        """Close the queue."""
107        if self._closed:
108            return
109
110        self._closed = True
111        self._wakeup_all(self._putters)
112        self._wakeup_next(self._getters)
113
114    def get_nowait(self) -> T:
115        """Return an item if one is immediately available, else raise
116        `QueueEmptyError`.
117
118        Raises:
119            QueueEmptyError
120
121        """
122        if self.empty():
123            raise QueueEmptyError()
124
125        item = self._queue.popleft()
126        self._wakeup_next(self._putters)
127        return item
128
129    def put_nowait(self, item: T):
130        """Put an item into the queue without blocking.
131
132        If no free slot is immediately available, raise `QueueFullError`.
133
134        Raises:
135            QueueFullError
136
137        """
138        if self._closed:
139            raise QueueClosedError()
140
141        if self.full():
142            raise QueueFullError()
143
144        self._queue.append(item)
145        self._wakeup_next(self._getters)
146
147    async def get(self) -> T:
148        """Remove and return an item from the queue.
149
150        If queue is empty, wait until an item is available.
151
152        Raises:
153            QueueClosedError
154
155        """
156        loop = asyncio.get_running_loop()
157
158        while self.empty():
159            if self._closed:
160                self._wakeup_all(self._getters)
161                raise QueueClosedError()
162
163            getter = loop.create_future()
164            self._getters.append(getter)
165
166            try:
167                await getter
168
169            except BaseException:
170                getter.cancel()
171
172                with contextlib.suppress(ValueError):
173                    self._getters.remove(getter)
174
175                if not getter.cancelled():
176                    if not self.empty() or self._closed:
177                        self._wakeup_next(self._getters)
178
179                raise
180
181        return self.get_nowait()
182
183    async def put(self, item: T):
184        """Put an item into the queue.
185
186        If the queue is full, wait until a free slot is available before adding
187        the item.
188
189        Raises:
190            QueueClosedError
191
192        """
193        loop = asyncio.get_running_loop()
194
195        while not self._closed and self.full():
196            putter = loop.create_future()
197            self._putters.append(putter)
198
199            try:
200                await putter
201
202            except BaseException:
203                putter.cancel()
204
205                with contextlib.suppress(ValueError):
206                    self._putters.remove(putter)
207
208                if not self.full() and not putter.cancelled():
209                    self._wakeup_next(self._putters)
210
211                raise
212
213        return self.put_nowait(item)
214
215    async def get_until_empty(self) -> T:
216        """Empty the queue and return the last item.
217
218        If queue is empty, wait until at least one item is available.
219
220        Raises:
221            QueueClosedError
222
223        """
224        item = await self.get()
225
226        while not self.empty():
227            item = self.get_nowait()
228
229        return item
230
231    def get_nowait_until_empty(self) -> T:
232        """Empty the queue and return the last item if at least one
233        item is immediately available, else raise `QueueEmptyError`.
234
235        Raises:
236            QueueEmptyError
237
238        """
239        item = self.get_nowait()
240
241        while not self.empty():
242            item = self.get_nowait()
243
244        return item
245
246    def _wakeup_next(self, waiters):
247        while waiters:
248            waiter = waiters.popleft()
249
250            if not waiter.done():
251                waiter.set_result(None)
252                break
253
254    def _wakeup_all(self, waiters):
255        while waiters:
256            waiter = waiters.popleft()
257
258            if not waiter.done():
259                waiter.set_result(None)

Asyncio queue which implements AsyncIterable and can be closed.

Interface and implementation are based on asyncio.Queue.

If maxsize is less than or equal to zero, the queue size is infinite.

Arguments:
  • maxsize: maximum number of items in the queue

Example::

queue = Queue(maxsize=1)

async def producer():
    for i in range(4):
        await queue.put(i)
    queue.close()

async def consumer():
    result = 0
    async for i in queue:
        result += i
    return result

asyncio.ensure_future(producer())
result = await consumer()
assert result == 6
Queue(maxsize: int = 0)
54    def __init__(self,
55                 maxsize: int = 0):
56        self._maxsize = maxsize
57        self._queue = collections.deque()
58        self._getters = collections.deque()
59        self._putters = collections.deque()
60        self._closed = False
maxsize: int
80    @property
81    def maxsize(self) -> int:
82        """Maximum number of items in the queue."""
83        return self._maxsize

Maximum number of items in the queue.

is_closed: bool
85    @property
86    def is_closed(self) -> bool:
87        """Is queue closed."""
88        return self._closed

Is queue closed.

def empty(self) -> bool:
90    def empty(self) -> bool:
91        """``True`` if queue is empty, ``False`` otherwise."""
92        return not self._queue

True if queue is empty, False otherwise.

def full(self) -> bool:
94    def full(self) -> bool:
95        """``True`` if queue is full, ``False`` otherwise."""
96        if self._maxsize > 0:
97            return len(self._queue) >= self._maxsize
98
99        return False

True if queue is full, False otherwise.

def qsize(self) -> int:
101    def qsize(self) -> int:
102        """Number of items currently in the queue."""
103        return len(self._queue)

Number of items currently in the queue.

def close(self):
105    def close(self):
106        """Close the queue."""
107        if self._closed:
108            return
109
110        self._closed = True
111        self._wakeup_all(self._putters)
112        self._wakeup_next(self._getters)

Close the queue.

def get_nowait(self) -> ~T:
114    def get_nowait(self) -> T:
115        """Return an item if one is immediately available, else raise
116        `QueueEmptyError`.
117
118        Raises:
119            QueueEmptyError
120
121        """
122        if self.empty():
123            raise QueueEmptyError()
124
125        item = self._queue.popleft()
126        self._wakeup_next(self._putters)
127        return item

Return an item if one is immediately available, else raise QueueEmptyError.

Raises:
  • QueueEmptyError
def put_nowait(self, item: ~T):
129    def put_nowait(self, item: T):
130        """Put an item into the queue without blocking.
131
132        If no free slot is immediately available, raise `QueueFullError`.
133
134        Raises:
135            QueueFullError
136
137        """
138        if self._closed:
139            raise QueueClosedError()
140
141        if self.full():
142            raise QueueFullError()
143
144        self._queue.append(item)
145        self._wakeup_next(self._getters)

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFullError.

Raises:
  • QueueFullError
async def get(self) -> ~T:
147    async def get(self) -> T:
148        """Remove and return an item from the queue.
149
150        If queue is empty, wait until an item is available.
151
152        Raises:
153            QueueClosedError
154
155        """
156        loop = asyncio.get_running_loop()
157
158        while self.empty():
159            if self._closed:
160                self._wakeup_all(self._getters)
161                raise QueueClosedError()
162
163            getter = loop.create_future()
164            self._getters.append(getter)
165
166            try:
167                await getter
168
169            except BaseException:
170                getter.cancel()
171
172                with contextlib.suppress(ValueError):
173                    self._getters.remove(getter)
174
175                if not getter.cancelled():
176                    if not self.empty() or self._closed:
177                        self._wakeup_next(self._getters)
178
179                raise
180
181        return self.get_nowait()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Raises:
  • QueueClosedError
async def put(self, item: ~T):
183    async def put(self, item: T):
184        """Put an item into the queue.
185
186        If the queue is full, wait until a free slot is available before adding
187        the item.
188
189        Raises:
190            QueueClosedError
191
192        """
193        loop = asyncio.get_running_loop()
194
195        while not self._closed and self.full():
196            putter = loop.create_future()
197            self._putters.append(putter)
198
199            try:
200                await putter
201
202            except BaseException:
203                putter.cancel()
204
205                with contextlib.suppress(ValueError):
206                    self._putters.remove(putter)
207
208                if not self.full() and not putter.cancelled():
209                    self._wakeup_next(self._putters)
210
211                raise
212
213        return self.put_nowait(item)

Put an item into the queue.

If the queue is full, wait until a free slot is available before adding the item.

Raises:
  • QueueClosedError
async def get_until_empty(self) -> ~T:
215    async def get_until_empty(self) -> T:
216        """Empty the queue and return the last item.
217
218        If queue is empty, wait until at least one item is available.
219
220        Raises:
221            QueueClosedError
222
223        """
224        item = await self.get()
225
226        while not self.empty():
227            item = self.get_nowait()
228
229        return item

Empty the queue and return the last item.

If queue is empty, wait until at least one item is available.

Raises:
  • QueueClosedError
def get_nowait_until_empty(self) -> ~T:
231    def get_nowait_until_empty(self) -> T:
232        """Empty the queue and return the last item if at least one
233        item is immediately available, else raise `QueueEmptyError`.
234
235        Raises:
236            QueueEmptyError
237
238        """
239        item = self.get_nowait()
240
241        while not self.empty():
242            item = self.get_nowait()
243
244        return item

Empty the queue and return the last item if at least one item is immediately available, else raise QueueEmptyError.

Raises:
  • QueueEmptyError
class CancelledWithResultError(asyncio.exceptions.CancelledError):
13class CancelledWithResultError(asyncio.CancelledError):
14    """CancelledError with associated result or exception"""
15
16    def __init__(self,
17                 result: typing.Any | None,
18                 exception: BaseException | None):
19        super().__init__()
20        self.__result = result
21        self.__exception = exception
22
23    @property
24    def result(self) -> typing.Any | None:
25        """Result"""
26        return self.__result
27
28    @property
29    def exception(self) -> BaseException | None:
30        return self.__exception

CancelledError with associated result or exception

CancelledWithResultError(result: typing.Any | None, exception: BaseException | None)
16    def __init__(self,
17                 result: typing.Any | None,
18                 exception: BaseException | None):
19        super().__init__()
20        self.__result = result
21        self.__exception = exception
result: typing.Any | None
23    @property
24    def result(self) -> typing.Any | None:
25        """Result"""
26        return self.__result

Result

exception: BaseException | None
28    @property
29    def exception(self) -> BaseException | None:
30        return self.__exception
async def wait_for(obj: Awaitable[~T], timeout: float) -> ~T:
33async def wait_for(obj: Awaitable[T],
34                   timeout: float
35                   ) -> T:
36    """"Wait for the awaitable object to complete, with timeout.
37
38    Implementation `asyncio.wait_for` that ensure propagation of
39    CancelledError.
40
41    If task is cancelled with objects's result available, instead of
42    returning result, this implementation raises `CancelledWithResultError`.
43
44    """
45    group = Group(log_exceptions=False)
46    group.spawn(call_on_done, asyncio.sleep(timeout), group.close)
47
48    task = group.wrap(obj)
49    group.spawn(call_on_done, asyncio.shield(task), group.close)
50
51    exc = None
52
53    try:
54        await group.wait_closing()
55
56    except asyncio.CancelledError as e:
57        exc = e
58
59    try:
60        await uncancellable(group.async_close())
61
62    except asyncio.CancelledError as e:
63        exc = e
64
65    if exc:
66        if task.cancelled():
67            raise exc
68
69        result = None if task.exception() else task.result()
70        raise CancelledWithResultError(result, task.exception()) from exc
71
72    if task.cancelled():
73        raise asyncio.TimeoutError()
74
75    return task.result()

"Wait for the awaitable object to complete, with timeout.

Implementation asyncio.wait_for that ensure propagation of CancelledError.

If task is cancelled with objects's result available, instead of returning result, this implementation raises CancelledWithResultError.