hat.aio
Async utility functions
1"""Async utility functions""" 2 3from hat.aio.executor import (Executor, 4 create_executor) 5from hat.aio.group import (Resource, 6 Group) 7from hat.aio.misc import (first, 8 uncancellable, 9 AsyncCallable, 10 call, 11 call_on_cancel, 12 call_on_done, 13 init_asyncio, 14 run_asyncio) 15from hat.aio.queue import (QueueClosedError, 16 QueueEmptyError, 17 QueueFullError, 18 Queue) 19from hat.aio.wait import (CancelledWithResultError, 20 wait_for) 21 22 23__all__ = ['Executor', 24 'create_executor', 25 'Resource', 26 'Group', 27 'first', 28 'uncancellable', 29 'AsyncCallable', 30 'call', 31 'call_on_cancel', 32 'call_on_done', 33 'init_asyncio', 34 'run_asyncio', 35 'QueueClosedError', 36 'QueueEmptyError', 37 'QueueFullError', 38 'Queue', 39 'CancelledWithResultError', 40 'wait_for']
11class Executor(Resource): 12 """Executor wrapping `asyncio.loop.run_in_executor`. 13 14 Wrapped executor is created from `executor_cls` with provided `args`. 15 16 If `wait_futures` is ``True``, executor will be closed once all running 17 tasks finishes. 18 19 `log_exceptions` is delegated to `async_group`. 20 21 Args: 22 args: executor args 23 executor_cls: executor class 24 log_exceptions: log exceptions 25 26 Example:: 27 28 executor1 = Executor() 29 executor2 = Executor() 30 tid1 = await executor1.spawn(threading.get_ident) 31 tid2 = await executor2.spawn(threading.get_ident) 32 assert tid1 != tid2 33 34 """ 35 36 def __init__(self, 37 *args, 38 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor, # NOQA 39 log_exceptions: bool = True, 40 wait_futures: bool = True): 41 self._wait_futures = wait_futures 42 self._executor = executor_cls(*args) 43 self._loop = asyncio.get_running_loop() 44 self._async_group = Group(log_exceptions) 45 46 self.async_group.spawn(call_on_cancel, self._executor.shutdown, False) 47 48 @property 49 def async_group(self): 50 """Async group""" 51 return self._async_group 52 53 def spawn(self, 54 fn: Callable, 55 *args, **kwargs 56 ) -> asyncio.Task: 57 """Spawn new task""" 58 return self.async_group.spawn(self._spawn, fn, args, kwargs) 59 60 async def _spawn(self, fn, args, kwargs): 61 func = functools.partial(fn, *args, **kwargs) 62 coro = self._loop.run_in_executor(self._executor, func) 63 64 if self._wait_futures: 65 coro = uncancellable(coro) 66 67 return await coro
Executor wrapping asyncio.loop.run_in_executor
.
Wrapped executor is created from executor_cls
with provided args
.
If wait_futures
is True
, executor will be closed once all running
tasks finishes.
log_exceptions
is delegated to async_group
.
Arguments:
- args: executor args
- executor_cls: executor class
- log_exceptions: log exceptions
Example::
executor1 = Executor()
executor2 = Executor()
tid1 = await executor1.spawn(threading.get_ident)
tid2 = await executor2.spawn(threading.get_ident)
assert tid1 != tid2
36 def __init__(self, 37 *args, 38 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor, # NOQA 39 log_exceptions: bool = True, 40 wait_futures: bool = True): 41 self._wait_futures = wait_futures 42 self._executor = executor_cls(*args) 43 self._loop = asyncio.get_running_loop() 44 self._async_group = Group(log_exceptions) 45 46 self.async_group.spawn(call_on_cancel, self._executor.shutdown, False)
53 def spawn(self, 54 fn: Callable, 55 *args, **kwargs 56 ) -> asyncio.Task: 57 """Spawn new task""" 58 return self.async_group.spawn(self._spawn, fn, args, kwargs)
Spawn new task
Inherited Members
70def create_executor(*args, 71 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor, # NOQA 72 loop: asyncio.AbstractEventLoop | None = None 73 ) -> Callable[..., Awaitable]: 74 """Create `asyncio.loop.run_in_executor` wrapper. 75 76 Wrapped executor is created from `executor_cls` with provided `args`. 77 78 This function returns coroutine that takes a function and its arguments, 79 executes the function in executor and returns the result. 80 81 Args: 82 args: executor args 83 executor_cls: executor class 84 loop: asyncio loop 85 86 Returns: 87 executor coroutine 88 89 Example:: 90 91 executor1 = create_executor() 92 executor2 = create_executor() 93 tid1 = await executor1(threading.get_ident) 94 tid2 = await executor2(threading.get_ident) 95 assert tid1 != tid2 96 97 """ 98 executor = executor_cls(*args) 99 100 async def executor_wrapper(func, /, *args, **kwargs): 101 _loop = loop or asyncio.get_running_loop() 102 fn = functools.partial(func, *args, **kwargs) 103 return await _loop.run_in_executor(executor, fn) 104 105 return executor_wrapper
Create asyncio.loop.run_in_executor
wrapper.
Wrapped executor is created from executor_cls
with provided args
.
This function returns coroutine that takes a function and its arguments, executes the function in executor and returns the result.
Arguments:
- args: executor args
- executor_cls: executor class
- loop: asyncio loop
Returns:
executor coroutine
Example::
executor1 = create_executor()
executor2 = create_executor()
tid1 = await executor1(threading.get_ident)
tid2 = await executor2(threading.get_ident)
assert tid1 != tid2
13class Resource(abc.ABC): 14 """Resource with lifetime control based on `Group`.""" 15 16 async def __aenter__(self): 17 return self 18 19 async def __aexit__(self, *args): 20 await self.async_close() 21 22 @property 23 @abc.abstractmethod 24 def async_group(self) -> 'Group': 25 """Group controlling resource's lifetime.""" 26 27 @property 28 def is_open(self) -> bool: 29 """``True`` if not closing or closed, ``False`` otherwise.""" 30 return self.async_group.is_open 31 32 @property 33 def is_closing(self) -> bool: 34 """Is resource closing or closed.""" 35 return self.async_group.is_closing 36 37 @property 38 def is_closed(self) -> bool: 39 """Is resource closed.""" 40 return self.async_group.is_closed 41 42 async def wait_closing(self): 43 """Wait until closing is ``True``.""" 44 await self.async_group.wait_closing() 45 46 async def wait_closed(self): 47 """Wait until closed is ``True``.""" 48 await self.async_group.wait_closed() 49 50 def close(self): 51 """Close resource.""" 52 self.async_group.close() 53 54 async def async_close(self): 55 """Close resource and wait until closed is ``True``.""" 56 await self.async_group.async_close()
Resource with lifetime control based on Group
.
22 @property 23 @abc.abstractmethod 24 def async_group(self) -> 'Group': 25 """Group controlling resource's lifetime."""
Group controlling resource's lifetime.
27 @property 28 def is_open(self) -> bool: 29 """``True`` if not closing or closed, ``False`` otherwise.""" 30 return self.async_group.is_open
True
if not closing or closed, False
otherwise.
32 @property 33 def is_closing(self) -> bool: 34 """Is resource closing or closed.""" 35 return self.async_group.is_closing
Is resource closing or closed.
37 @property 38 def is_closed(self) -> bool: 39 """Is resource closed.""" 40 return self.async_group.is_closed
Is resource closed.
42 async def wait_closing(self): 43 """Wait until closing is ``True``.""" 44 await self.async_group.wait_closing()
Wait until closing is True
.
46 async def wait_closed(self): 47 """Wait until closed is ``True``.""" 48 await self.async_group.wait_closed()
Wait until closed is True
.
63class Group(Resource): 64 """Group of asyncio Tasks. 65 66 Group enables creation and management of related asyncio Tasks. The 67 Group ensures uninterrupted execution of Tasks and Task completion upon 68 Group closing. 69 70 Group can contain subgroups, which are independent Groups managed by the 71 parent Group. 72 73 If a Task raises exception, other Tasks continue to execute. 74 75 If `log_exceptions` is ``True``, exceptions raised by spawned tasks are 76 logged with level ERROR. 77 78 """ 79 80 def __init__(self, 81 log_exceptions: bool = True, 82 *, 83 loop: asyncio.AbstractEventLoop | None = None): 84 self._log_exceptions = log_exceptions 85 self._loop = loop or asyncio.get_running_loop() 86 self._closing = self._loop.create_future() 87 self._closed = self._loop.create_future() 88 self._tasks = set() 89 self._parent = None 90 self._children = set() 91 92 @property 93 def async_group(self): 94 """Async group""" 95 return self 96 97 @property 98 def is_open(self) -> bool: 99 """``True`` if group is not closing or closed, ``False`` otherwise.""" 100 return not self._closing.done() 101 102 @property 103 def is_closing(self) -> bool: 104 """Is group closing or closed.""" 105 return self._closing.done() 106 107 @property 108 def is_closed(self) -> bool: 109 """Is group closed.""" 110 return self._closed.done() 111 112 async def wait_closing(self): 113 """Wait until closing is ``True``.""" 114 await asyncio.shield(self._closing) 115 116 async def wait_closed(self): 117 """Wait until closed is ``True``.""" 118 await asyncio.shield(self._closed) 119 120 def create_subgroup(self, 121 log_exceptions: bool | None = None 122 ) -> 'Group': 123 """Create new Group as a child of this Group. Return the new Group. 124 125 When a parent Group gets closed, all of its children are closed. 126 Closing of a subgroup has no effect on the parent Group. 127 128 If `log_exceptions` is ``None``, subgroup inherits `log_exceptions` 129 from its parent. 130 131 """ 132 if self._closing.done(): 133 raise GroupClosedError("can't create subgroup of closed group") 134 135 child = Group( 136 log_exceptions=(self._log_exceptions if log_exceptions is None 137 else log_exceptions), 138 loop=self._loop) 139 child._parent = self 140 self._children.add(child) 141 return child 142 143 def wrap(self, 144 obj: Awaitable 145 ) -> asyncio.Task: 146 """Wrap the awaitable object into a Task and schedule its execution. 147 Return the Task object. 148 149 Resulting task is shielded and can be canceled only with 150 `Group.close`. 151 152 """ 153 if self._closing.done(): 154 raise GroupClosedError("can't wrap object in closed group") 155 156 if asyncio.iscoroutine(obj): 157 task = self._loop.create_task(obj) 158 159 else: 160 task = asyncio.ensure_future(obj, loop=self._loop) 161 162 self._tasks.add(task) 163 task.add_done_callback(self._on_task_done) 164 165 return asyncio.shield(task) 166 167 def spawn(self, 168 fn: Callable[..., Awaitable], 169 *args, **kwargs 170 ) -> asyncio.Task: 171 """Wrap the result of a `fn` into a Task and schedule its execution. 172 Return the Task object. 173 174 Function `fn` is called with provided `args` and `kwargs`. 175 Resulting Task is shielded and can be canceled only with 176 `Group.close`. 177 178 """ 179 if self._closing.done(): 180 raise GroupClosedError("can't spawn task in closed group") 181 182 future = fn(*args, **kwargs) 183 return self.wrap(future) 184 185 def close(self): 186 """Schedule Group closing. 187 188 Closing Future is set immediately. All subgroups are closed, and all 189 running tasks are canceled. Once closing of all subgroups 190 and execution of all tasks is completed, closed Future is set. 191 192 """ 193 if self._closing.done(): 194 return 195 196 self._closing.set_result(True) 197 198 for child in list(self._children): 199 child.close() 200 201 for task in self._tasks: 202 self._loop.call_soon(task.cancel) 203 204 futures = [*self._tasks, 205 *(child._closed for child in self._children)] 206 if futures: 207 waiting_task = self._loop.create_task(asyncio.wait(futures)) 208 waiting_task.add_done_callback(lambda _: self._on_closed()) 209 210 else: 211 self._on_closed() 212 213 async def async_close(self): 214 """Close Group and wait until closed is ``True``.""" 215 self.close() 216 await self.wait_closed() 217 218 async def __aenter__(self): 219 return self 220 221 async def __aexit__(self, *args): 222 await self.async_close() 223 224 def _on_closed(self): 225 if self._parent is not None: 226 self._parent._children.remove(self) 227 self._parent = None 228 229 self._closed.set_result(True) 230 231 def _on_task_done(self, task): 232 self._tasks.remove(task) 233 234 if task.cancelled(): 235 return 236 237 e = task.exception() 238 if e and self._log_exceptions: 239 mlog.error('unhandled exception in async group: %s', e, exc_info=e) 240 warnings.warn('unhandled exception in async group')
Group of asyncio Tasks.
Group enables creation and management of related asyncio Tasks. The Group ensures uninterrupted execution of Tasks and Task completion upon Group closing.
Group can contain subgroups, which are independent Groups managed by the parent Group.
If a Task raises exception, other Tasks continue to execute.
If log_exceptions
is True
, exceptions raised by spawned tasks are
logged with level ERROR.
80 def __init__(self, 81 log_exceptions: bool = True, 82 *, 83 loop: asyncio.AbstractEventLoop | None = None): 84 self._log_exceptions = log_exceptions 85 self._loop = loop or asyncio.get_running_loop() 86 self._closing = self._loop.create_future() 87 self._closed = self._loop.create_future() 88 self._tasks = set() 89 self._parent = None 90 self._children = set()
97 @property 98 def is_open(self) -> bool: 99 """``True`` if group is not closing or closed, ``False`` otherwise.""" 100 return not self._closing.done()
True
if group is not closing or closed, False
otherwise.
102 @property 103 def is_closing(self) -> bool: 104 """Is group closing or closed.""" 105 return self._closing.done()
Is group closing or closed.
107 @property 108 def is_closed(self) -> bool: 109 """Is group closed.""" 110 return self._closed.done()
Is group closed.
112 async def wait_closing(self): 113 """Wait until closing is ``True``.""" 114 await asyncio.shield(self._closing)
Wait until closing is True
.
116 async def wait_closed(self): 117 """Wait until closed is ``True``.""" 118 await asyncio.shield(self._closed)
Wait until closed is True
.
120 def create_subgroup(self, 121 log_exceptions: bool | None = None 122 ) -> 'Group': 123 """Create new Group as a child of this Group. Return the new Group. 124 125 When a parent Group gets closed, all of its children are closed. 126 Closing of a subgroup has no effect on the parent Group. 127 128 If `log_exceptions` is ``None``, subgroup inherits `log_exceptions` 129 from its parent. 130 131 """ 132 if self._closing.done(): 133 raise GroupClosedError("can't create subgroup of closed group") 134 135 child = Group( 136 log_exceptions=(self._log_exceptions if log_exceptions is None 137 else log_exceptions), 138 loop=self._loop) 139 child._parent = self 140 self._children.add(child) 141 return child
Create new Group as a child of this Group. Return the new Group.
When a parent Group gets closed, all of its children are closed. Closing of a subgroup has no effect on the parent Group.
If log_exceptions
is None
, subgroup inherits log_exceptions
from its parent.
143 def wrap(self, 144 obj: Awaitable 145 ) -> asyncio.Task: 146 """Wrap the awaitable object into a Task and schedule its execution. 147 Return the Task object. 148 149 Resulting task is shielded and can be canceled only with 150 `Group.close`. 151 152 """ 153 if self._closing.done(): 154 raise GroupClosedError("can't wrap object in closed group") 155 156 if asyncio.iscoroutine(obj): 157 task = self._loop.create_task(obj) 158 159 else: 160 task = asyncio.ensure_future(obj, loop=self._loop) 161 162 self._tasks.add(task) 163 task.add_done_callback(self._on_task_done) 164 165 return asyncio.shield(task)
Wrap the awaitable object into a Task and schedule its execution. Return the Task object.
Resulting task is shielded and can be canceled only with
Group.close
.
167 def spawn(self, 168 fn: Callable[..., Awaitable], 169 *args, **kwargs 170 ) -> asyncio.Task: 171 """Wrap the result of a `fn` into a Task and schedule its execution. 172 Return the Task object. 173 174 Function `fn` is called with provided `args` and `kwargs`. 175 Resulting Task is shielded and can be canceled only with 176 `Group.close`. 177 178 """ 179 if self._closing.done(): 180 raise GroupClosedError("can't spawn task in closed group") 181 182 future = fn(*args, **kwargs) 183 return self.wrap(future)
Wrap the result of a fn
into a Task and schedule its execution.
Return the Task object.
Function fn
is called with provided args
and kwargs
.
Resulting Task is shielded and can be canceled only with
Group.close
.
185 def close(self): 186 """Schedule Group closing. 187 188 Closing Future is set immediately. All subgroups are closed, and all 189 running tasks are canceled. Once closing of all subgroups 190 and execution of all tasks is completed, closed Future is set. 191 192 """ 193 if self._closing.done(): 194 return 195 196 self._closing.set_result(True) 197 198 for child in list(self._children): 199 child.close() 200 201 for task in self._tasks: 202 self._loop.call_soon(task.cancel) 203 204 futures = [*self._tasks, 205 *(child._closed for child in self._children)] 206 if futures: 207 waiting_task = self._loop.create_task(asyncio.wait(futures)) 208 waiting_task.add_done_callback(lambda _: self._on_closed()) 209 210 else: 211 self._on_closed()
Schedule Group closing.
Closing Future is set immediately. All subgroups are closed, and all running tasks are canceled. Once closing of all subgroups and execution of all tasks is completed, closed Future is set.
14async def first(xs: AsyncIterable[T], 15 fn: Callable[[T], typing.Any] = lambda _: True, 16 default: T | None = None 17 ) -> T | None: 18 """Return the first element from async iterable that satisfies 19 predicate `fn`, or `default` if no such element exists. 20 21 Result of predicate `fn` can be of any type. Predicate is satisfied if it's 22 return value is truthy. 23 24 Args: 25 xs: async collection 26 fn: predicate 27 default: default value 28 29 Example:: 30 31 async def async_range(x): 32 for i in range(x): 33 await asyncio.sleep(0) 34 yield i 35 36 assert await first(async_range(3)) == 0 37 assert await first(async_range(3), lambda x: x > 1) == 2 38 assert await first(async_range(3), lambda x: x > 2) is None 39 assert await first(async_range(3), lambda x: x > 2, 123) == 123 40 41 """ 42 async for i in xs: 43 if fn(i): 44 return i 45 46 return default
Return the first element from async iterable that satisfies
predicate fn
, or default
if no such element exists.
Result of predicate fn
can be of any type. Predicate is satisfied if it's
return value is truthy.
Arguments:
- xs: async collection
- fn: predicate
- default: default value
Example::
async def async_range(x):
for i in range(x):
await asyncio.sleep(0)
yield i
assert await first(async_range(3)) == 0
assert await first(async_range(3), lambda x: x > 1) == 2
assert await first(async_range(3), lambda x: x > 2) is None
assert await first(async_range(3), lambda x: x > 2, 123) == 123
49async def uncancellable(obj: Awaitable[T], 50 raise_cancel: bool = True 51 ) -> T: 52 """Uncancellable execution of a awaitable object. 53 54 Object is scheduled as task, shielded and its execution cannot be 55 interrupted. 56 57 If `raise_cancel` is `True` and the object gets canceled, 58 `asyncio.CancelledError` is reraised after the Future finishes. 59 60 Warning: 61 If `raise_cancel` is `False`, this method suppresses 62 `asyncio.CancelledError` and stops its propagation. Use with 63 caution. 64 65 Args: 66 obj: awaitable object 67 raise_cancel: raise CancelledError flag 68 69 Returns: 70 object execution result 71 72 """ 73 exception = None 74 loop = asyncio.get_running_loop() 75 76 if asyncio.iscoroutine(obj): 77 task = loop.create_task(obj) 78 79 else: 80 task = asyncio.ensure_future(obj, loop=loop) 81 82 while not task.done(): 83 try: 84 await asyncio.shield(task) 85 86 except asyncio.CancelledError as e: 87 if raise_cancel: 88 exception = e 89 90 except Exception: 91 pass 92 93 if exception: 94 raise exception 95 96 return task.result()
Uncancellable execution of a awaitable object.
Object is scheduled as task, shielded and its execution cannot be interrupted.
If raise_cancel
is True
and the object gets canceled,
asyncio.CancelledError
is reraised after the Future finishes.
Warning:
If
raise_cancel
isFalse
, this method suppressesasyncio.CancelledError
and stops its propagation. Use with caution.
Arguments:
- obj: awaitable object
- raise_cancel: raise CancelledError flag
Returns:
object execution result
117async def call(fn: AsyncCallable[..., T], *args, **kwargs) -> T: 118 """Call a function or a coroutine (or other callable object). 119 120 Call the `fn` with `args` and `kwargs`. If result of this call is 121 awaitable, it is awaited and returned. Otherwise, result is immediately 122 returned. 123 124 Args: 125 fn: callable object 126 args: additional positional arguments 127 kwargs: additional keyword arguments 128 129 Returns: 130 awaited result or result 131 132 Example: 133 134 def f1(x): 135 return x 136 137 def f2(x): 138 f = asyncio.Future() 139 f.set_result(x) 140 return f 141 142 async def f3(x): 143 return x 144 145 assert 'f1' == await hat.aio.call(f1, 'f1') 146 assert 'f2' == await hat.aio.call(f2, 'f2') 147 assert 'f3' == await hat.aio.call(f3, 'f3') 148 149 """ 150 result = fn(*args, **kwargs) 151 152 if inspect.isawaitable(result): 153 result = await result 154 155 return result
Call a function or a coroutine (or other callable object).
Call the fn
with args
and kwargs
. If result of this call is
awaitable, it is awaited and returned. Otherwise, result is immediately
returned.
Arguments:
- fn: callable object
- args: additional positional arguments
- kwargs: additional keyword arguments
Returns:
awaited result or result
Example:
def f1(x): return x
def f2(x): f = asyncio.Future() f.set_result(x) return f
async def f3(x): return x
assert 'f1' == await hat.aio.call(f1, 'f1') assert 'f2' == await hat.aio.call(f2, 'f2') assert 'f3' == await hat.aio.call(f3, 'f3')
158async def call_on_cancel(fn: AsyncCallable[..., T], *args, **kwargs) -> T: 159 """Call a function or a coroutine when canceled. 160 161 When canceled, `fn` is called with `args` and `kwargs` by using 162 `call` coroutine. 163 164 Args: 165 fn: function or coroutine 166 args: additional function arguments 167 kwargs: additional function keyword arguments 168 169 Returns: 170 function result 171 172 Example:: 173 174 f = asyncio.Future() 175 group = Group() 176 group.spawn(call_on_cancel, f.set_result, 123) 177 assert not f.done() 178 await group.async_close() 179 assert f.result() == 123 180 181 """ 182 with contextlib.suppress(asyncio.CancelledError): 183 await asyncio.get_running_loop().create_future() 184 185 return await call(fn, *args, *kwargs)
Call a function or a coroutine when canceled.
When canceled, fn
is called with args
and kwargs
by using
call
coroutine.
Arguments:
- fn: function or coroutine
- args: additional function arguments
- kwargs: additional function keyword arguments
Returns:
function result
Example::
f = asyncio.Future()
group = Group()
group.spawn(call_on_cancel, f.set_result, 123)
assert not f.done()
await group.async_close()
assert f.result() == 123
188async def call_on_done(f: Awaitable, 189 fn: AsyncCallable[..., T], 190 *args, **kwargs 191 ) -> T: 192 """Call a function or a coroutine when awaitable is done. 193 194 When `f` is done, `fn` is called with `args` and `kwargs` by using 195 `call` coroutine. 196 197 If this coroutine is canceled before `f` is done, `f` is canceled and `fn` 198 is not called. 199 200 If this coroutine is canceled after `f` is done, `fn` call is canceled. 201 202 Args: 203 f: awaitable future 204 fn: function or coroutine 205 args: additional function arguments 206 kwargs: additional function keyword arguments 207 208 Returns: 209 function result 210 211 Example:: 212 213 f = asyncio.Future() 214 group = Group() 215 group.spawn(call_on_done, f, group.close) 216 assert group.is_open 217 f.set_result(None) 218 await group.wait_closed() 219 assert group.is_closed 220 221 """ 222 with contextlib.suppress(Exception): 223 await f 224 225 return await call(fn, *args, *kwargs)
Call a function or a coroutine when awaitable is done.
When f
is done, fn
is called with args
and kwargs
by using
call
coroutine.
If this coroutine is canceled before f
is done, f
is canceled and fn
is not called.
If this coroutine is canceled after f
is done, fn
call is canceled.
Arguments:
- f: awaitable future
- fn: function or coroutine
- args: additional function arguments
- kwargs: additional function keyword arguments
Returns:
function result
Example::
f = asyncio.Future()
group = Group()
group.spawn(call_on_done, f, group.close)
assert group.is_open
f.set_result(None)
await group.wait_closed()
assert group.is_closed
228def init_asyncio(policy: asyncio.AbstractEventLoopPolicy | None = None): 229 """Initialize asyncio. 230 231 Sets event loop policy (if ``None``, instance of 232 `asyncio.DefaultEventLoopPolicy` is used). 233 234 On Windows, `asyncio.WindowsProactorEventLoopPolicy` is used as default 235 policy. 236 237 """ 238 239 def get_default_policy(): 240 if sys.platform == 'win32': 241 return asyncio.WindowsProactorEventLoopPolicy() 242 243 # TODO: evaluate usage of uvloop 244 # with contextlib.suppress(ModuleNotFoundError): 245 # import uvloop 246 # return uvloop.EventLoopPolicy() 247 248 return asyncio.DefaultEventLoopPolicy() 249 250 asyncio.set_event_loop_policy(policy or get_default_policy())
Initialize asyncio.
Sets event loop policy (if None
, instance of
asyncio.DefaultEventLoopPolicy
is used).
On Windows, asyncio.WindowsProactorEventLoopPolicy
is used as default
policy.
253def run_asyncio(future: Awaitable[T], *, 254 handle_signals: bool = True, 255 loop: asyncio.AbstractEventLoop | None = None 256 ) -> T: 257 """Run asyncio loop until the `future` is completed and return the result. 258 259 If `handle_signals` is ``True``, SIGINT and SIGTERM handlers are 260 temporarily overridden. Instead of raising ``KeyboardInterrupt`` on every 261 signal reception, Future is canceled only once. Additional signals are 262 ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also 263 overridden. 264 265 If `loop` is set to ``None``, new event loop is created and set 266 as thread's default event loop. Newly created loop is closed when this 267 coroutine returns. Running tasks or async generators, other than provided 268 `future`, are not canceled prior to loop closing. 269 270 On Windows, asyncio loop gets periodically woken up (every 0.5 seconds). 271 272 Args: 273 future: future or coroutine 274 handle_signals: handle signals flag 275 loop: event loop 276 277 Returns: 278 future's result 279 280 Example:: 281 282 async def run(): 283 await asyncio.sleep(0) 284 return 123 285 286 result = run_asyncio(run()) 287 assert result == 123 288 289 """ 290 close_loop = loop is None 291 if loop is None: 292 loop = asyncio.new_event_loop() 293 asyncio.set_event_loop(loop) 294 295 task = asyncio.ensure_future(future, loop=loop) 296 297 if sys.platform == 'win32': 298 299 async def task_wrapper(task): 300 try: 301 while not task.done(): 302 await asyncio.wait([task], timeout=0.5) 303 except asyncio.CancelledError: 304 task.cancel() 305 return await task 306 307 task = asyncio.ensure_future(task_wrapper(task), loop=loop) 308 309 if not handle_signals: 310 return loop.run_until_complete(task) 311 312 canceled = False 313 signalnums = [signal.SIGINT, signal.SIGTERM] 314 if sys.platform == 'win32': 315 signalnums += [signal.SIGBREAK] 316 317 def signal_handler(*args): 318 nonlocal canceled 319 if canceled: 320 return 321 loop.call_soon_threadsafe(task.cancel) 322 canceled = True 323 324 handlers = {signalnum: signal.getsignal(signalnum) or signal.SIG_DFL 325 for signalnum in signalnums} 326 for signalnum in signalnums: 327 signal.signal(signalnum, signal_handler) 328 329 try: 330 return loop.run_until_complete(task) 331 332 finally: 333 for signalnum, handler in handlers.items(): 334 signal.signal(signalnum, handler) 335 if close_loop: 336 loop.close()
Run asyncio loop until the future
is completed and return the result.
If handle_signals
is True
, SIGINT and SIGTERM handlers are
temporarily overridden. Instead of raising KeyboardInterrupt
on every
signal reception, Future is canceled only once. Additional signals are
ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also
overridden.
If loop
is set to None
, new event loop is created and set
as thread's default event loop. Newly created loop is closed when this
coroutine returns. Running tasks or async generators, other than provided
future
, are not canceled prior to loop closing.
On Windows, asyncio loop gets periodically woken up (every 0.5 seconds).
Arguments:
- future: future or coroutine
- handle_signals: handle signals flag
- loop: event loop
Returns:
future's result
Example::
async def run():
await asyncio.sleep(0)
return 123
result = run_asyncio(run())
assert result == 123
Raised when trying to use a closed queue.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Raised if queue is empty.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Raised if queue is full.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
23class Queue(typing.Generic[T]): 24 """Asyncio queue which implements AsyncIterable and can be closed. 25 26 Interface and implementation are based on `asyncio.Queue`. 27 28 If `maxsize` is less than or equal to zero, the queue size is infinite. 29 30 Args: 31 maxsize: maximum number of items in the queue 32 33 Example:: 34 35 queue = Queue(maxsize=1) 36 37 async def producer(): 38 for i in range(4): 39 await queue.put(i) 40 queue.close() 41 42 async def consumer(): 43 result = 0 44 async for i in queue: 45 result += i 46 return result 47 48 asyncio.ensure_future(producer()) 49 result = await consumer() 50 assert result == 6 51 52 """ 53 54 def __init__(self, 55 maxsize: int = 0): 56 self._maxsize = maxsize 57 self._queue = collections.deque() 58 self._getters = collections.deque() 59 self._putters = collections.deque() 60 self._closed = False 61 62 def __aiter__(self): 63 return self 64 65 async def __anext__(self): 66 try: 67 return await self.get() 68 69 except QueueClosedError: 70 raise StopAsyncIteration 71 72 def __str__(self): 73 return (f'<{type(self).__name__}' 74 f' _closed={self._closed} ' 75 f' _queue={list(self._queue)}>') 76 77 def __len__(self): 78 return len(self._queue) 79 80 @property 81 def maxsize(self) -> int: 82 """Maximum number of items in the queue.""" 83 return self._maxsize 84 85 @property 86 def is_closed(self) -> bool: 87 """Is queue closed.""" 88 return self._closed 89 90 def empty(self) -> bool: 91 """``True`` if queue is empty, ``False`` otherwise.""" 92 return not self._queue 93 94 def full(self) -> bool: 95 """``True`` if queue is full, ``False`` otherwise.""" 96 if self._maxsize > 0: 97 return len(self._queue) >= self._maxsize 98 99 return False 100 101 def qsize(self) -> int: 102 """Number of items currently in the queue.""" 103 return len(self._queue) 104 105 def close(self): 106 """Close the queue.""" 107 if self._closed: 108 return 109 110 self._closed = True 111 self._wakeup_all(self._putters) 112 self._wakeup_next(self._getters) 113 114 def get_nowait(self) -> T: 115 """Return an item if one is immediately available, else raise 116 `QueueEmptyError`. 117 118 Raises: 119 QueueEmptyError 120 121 """ 122 if self.empty(): 123 raise QueueEmptyError() 124 125 item = self._queue.popleft() 126 self._wakeup_next(self._putters) 127 return item 128 129 def put_nowait(self, item: T): 130 """Put an item into the queue without blocking. 131 132 If no free slot is immediately available, raise `QueueFullError`. 133 134 Raises: 135 QueueFullError 136 137 """ 138 if self._closed: 139 raise QueueClosedError() 140 141 if self.full(): 142 raise QueueFullError() 143 144 self._queue.append(item) 145 self._wakeup_next(self._getters) 146 147 async def get(self) -> T: 148 """Remove and return an item from the queue. 149 150 If queue is empty, wait until an item is available. 151 152 Raises: 153 QueueClosedError 154 155 """ 156 loop = asyncio.get_running_loop() 157 158 while self.empty(): 159 if self._closed: 160 self._wakeup_all(self._getters) 161 raise QueueClosedError() 162 163 getter = loop.create_future() 164 self._getters.append(getter) 165 166 try: 167 await getter 168 169 except BaseException: 170 getter.cancel() 171 172 with contextlib.suppress(ValueError): 173 self._getters.remove(getter) 174 175 if not getter.cancelled(): 176 if not self.empty() or self._closed: 177 self._wakeup_next(self._getters) 178 179 raise 180 181 return self.get_nowait() 182 183 async def put(self, item: T): 184 """Put an item into the queue. 185 186 If the queue is full, wait until a free slot is available before adding 187 the item. 188 189 Raises: 190 QueueClosedError 191 192 """ 193 loop = asyncio.get_running_loop() 194 195 while not self._closed and self.full(): 196 putter = loop.create_future() 197 self._putters.append(putter) 198 199 try: 200 await putter 201 202 except BaseException: 203 putter.cancel() 204 205 with contextlib.suppress(ValueError): 206 self._putters.remove(putter) 207 208 if not self.full() and not putter.cancelled(): 209 self._wakeup_next(self._putters) 210 211 raise 212 213 return self.put_nowait(item) 214 215 async def get_until_empty(self) -> T: 216 """Empty the queue and return the last item. 217 218 If queue is empty, wait until at least one item is available. 219 220 Raises: 221 QueueClosedError 222 223 """ 224 item = await self.get() 225 226 while not self.empty(): 227 item = self.get_nowait() 228 229 return item 230 231 def get_nowait_until_empty(self) -> T: 232 """Empty the queue and return the last item if at least one 233 item is immediately available, else raise `QueueEmptyError`. 234 235 Raises: 236 QueueEmptyError 237 238 """ 239 item = self.get_nowait() 240 241 while not self.empty(): 242 item = self.get_nowait() 243 244 return item 245 246 def _wakeup_next(self, waiters): 247 while waiters: 248 waiter = waiters.popleft() 249 250 if not waiter.done(): 251 waiter.set_result(None) 252 break 253 254 def _wakeup_all(self, waiters): 255 while waiters: 256 waiter = waiters.popleft() 257 258 if not waiter.done(): 259 waiter.set_result(None)
Asyncio queue which implements AsyncIterable and can be closed.
Interface and implementation are based on asyncio.Queue
.
If maxsize
is less than or equal to zero, the queue size is infinite.
Arguments:
- maxsize: maximum number of items in the queue
Example::
queue = Queue(maxsize=1)
async def producer():
for i in range(4):
await queue.put(i)
queue.close()
async def consumer():
result = 0
async for i in queue:
result += i
return result
asyncio.ensure_future(producer())
result = await consumer()
assert result == 6
80 @property 81 def maxsize(self) -> int: 82 """Maximum number of items in the queue.""" 83 return self._maxsize
Maximum number of items in the queue.
90 def empty(self) -> bool: 91 """``True`` if queue is empty, ``False`` otherwise.""" 92 return not self._queue
True
if queue is empty, False
otherwise.
94 def full(self) -> bool: 95 """``True`` if queue is full, ``False`` otherwise.""" 96 if self._maxsize > 0: 97 return len(self._queue) >= self._maxsize 98 99 return False
True
if queue is full, False
otherwise.
101 def qsize(self) -> int: 102 """Number of items currently in the queue.""" 103 return len(self._queue)
Number of items currently in the queue.
105 def close(self): 106 """Close the queue.""" 107 if self._closed: 108 return 109 110 self._closed = True 111 self._wakeup_all(self._putters) 112 self._wakeup_next(self._getters)
Close the queue.
114 def get_nowait(self) -> T: 115 """Return an item if one is immediately available, else raise 116 `QueueEmptyError`. 117 118 Raises: 119 QueueEmptyError 120 121 """ 122 if self.empty(): 123 raise QueueEmptyError() 124 125 item = self._queue.popleft() 126 self._wakeup_next(self._putters) 127 return item
129 def put_nowait(self, item: T): 130 """Put an item into the queue without blocking. 131 132 If no free slot is immediately available, raise `QueueFullError`. 133 134 Raises: 135 QueueFullError 136 137 """ 138 if self._closed: 139 raise QueueClosedError() 140 141 if self.full(): 142 raise QueueFullError() 143 144 self._queue.append(item) 145 self._wakeup_next(self._getters)
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFullError
.
Raises:
- QueueFullError
147 async def get(self) -> T: 148 """Remove and return an item from the queue. 149 150 If queue is empty, wait until an item is available. 151 152 Raises: 153 QueueClosedError 154 155 """ 156 loop = asyncio.get_running_loop() 157 158 while self.empty(): 159 if self._closed: 160 self._wakeup_all(self._getters) 161 raise QueueClosedError() 162 163 getter = loop.create_future() 164 self._getters.append(getter) 165 166 try: 167 await getter 168 169 except BaseException: 170 getter.cancel() 171 172 with contextlib.suppress(ValueError): 173 self._getters.remove(getter) 174 175 if not getter.cancelled(): 176 if not self.empty() or self._closed: 177 self._wakeup_next(self._getters) 178 179 raise 180 181 return self.get_nowait()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
Raises:
- QueueClosedError
183 async def put(self, item: T): 184 """Put an item into the queue. 185 186 If the queue is full, wait until a free slot is available before adding 187 the item. 188 189 Raises: 190 QueueClosedError 191 192 """ 193 loop = asyncio.get_running_loop() 194 195 while not self._closed and self.full(): 196 putter = loop.create_future() 197 self._putters.append(putter) 198 199 try: 200 await putter 201 202 except BaseException: 203 putter.cancel() 204 205 with contextlib.suppress(ValueError): 206 self._putters.remove(putter) 207 208 if not self.full() and not putter.cancelled(): 209 self._wakeup_next(self._putters) 210 211 raise 212 213 return self.put_nowait(item)
Put an item into the queue.
If the queue is full, wait until a free slot is available before adding the item.
Raises:
- QueueClosedError
215 async def get_until_empty(self) -> T: 216 """Empty the queue and return the last item. 217 218 If queue is empty, wait until at least one item is available. 219 220 Raises: 221 QueueClosedError 222 223 """ 224 item = await self.get() 225 226 while not self.empty(): 227 item = self.get_nowait() 228 229 return item
Empty the queue and return the last item.
If queue is empty, wait until at least one item is available.
Raises:
- QueueClosedError
231 def get_nowait_until_empty(self) -> T: 232 """Empty the queue and return the last item if at least one 233 item is immediately available, else raise `QueueEmptyError`. 234 235 Raises: 236 QueueEmptyError 237 238 """ 239 item = self.get_nowait() 240 241 while not self.empty(): 242 item = self.get_nowait() 243 244 return item
Empty the queue and return the last item if at least one
item is immediately available, else raise QueueEmptyError
.
Raises:
- QueueEmptyError
13class CancelledWithResultError(asyncio.CancelledError): 14 """CancelledError with associated result or exception""" 15 16 def __init__(self, 17 result: typing.Any | None, 18 exception: BaseException | None): 19 super().__init__() 20 self.__result = result 21 self.__exception = exception 22 23 @property 24 def result(self) -> typing.Any | None: 25 """Result""" 26 return self.__result 27 28 @property 29 def exception(self) -> BaseException | None: 30 return self.__exception
CancelledError with associated result or exception
Inherited Members
- builtins.BaseException
- with_traceback
- args
33async def wait_for(obj: Awaitable[T], 34 timeout: float 35 ) -> T: 36 """"Wait for the awaitable object to complete, with timeout. 37 38 Implementation `asyncio.wait_for` that ensure propagation of 39 CancelledError. 40 41 If task is cancelled with objects's result available, instead of 42 returning result, this implementation raises `CancelledWithResultError`. 43 44 """ 45 group = Group(log_exceptions=False) 46 group.spawn(call_on_done, asyncio.sleep(timeout), group.close) 47 48 task = group.wrap(obj) 49 group.spawn(call_on_done, asyncio.shield(task), group.close) 50 51 exc = None 52 53 try: 54 await group.wait_closing() 55 56 except asyncio.CancelledError as e: 57 exc = e 58 59 try: 60 await uncancellable(group.async_close()) 61 62 except asyncio.CancelledError as e: 63 exc = e 64 65 if exc: 66 if task.cancelled(): 67 raise exc 68 69 result = None if task.exception() else task.result() 70 raise CancelledWithResultError(result, task.exception()) from exc 71 72 if task.cancelled(): 73 raise asyncio.TimeoutError() 74 75 return task.result()
"Wait for the awaitable object to complete, with timeout.
Implementation asyncio.wait_for
that ensure propagation of
CancelledError.
If task is cancelled with objects's result available, instead of
returning result, this implementation raises CancelledWithResultError
.