hat.aio
Async utility functions
1"""Async utility functions""" 2 3from hat.aio.executor import (Executor, 4 create_executor) 5from hat.aio.group import (Resource, 6 Group) 7from hat.aio.misc import (first, 8 uncancellable, 9 AsyncCallable, 10 call, 11 call_on_cancel, 12 call_on_done, 13 init_asyncio, 14 run_asyncio) 15from hat.aio.queue import (QueueClosedError, 16 QueueEmptyError, 17 QueueFullError, 18 Queue) 19from hat.aio.wait import (CancelledWithResultError, 20 wait_for) 21 22 23__all__ = ['Executor', 24 'create_executor', 25 'Resource', 26 'Group', 27 'first', 28 'uncancellable', 29 'AsyncCallable', 30 'call', 31 'call_on_cancel', 32 'call_on_done', 33 'init_asyncio', 34 'run_asyncio', 35 'QueueClosedError', 36 'QueueEmptyError', 37 'QueueFullError', 38 'Queue', 39 'CancelledWithResultError', 40 'wait_for']
11class Executor(Resource): 12 """Executor wrapping `asyncio.loop.run_in_executor`. 13 14 Wrapped executor is created from `executor_cls` with provided `args`. 15 16 If `wait_futures` is ``True``, executor will be closed once all running 17 tasks finishes. 18 19 `log_exceptions` is delegated to `async_group`. 20 21 Args: 22 args: executor args 23 executor_cls: executor class 24 log_exceptions: log exceptions 25 26 Example:: 27 28 executor1 = Executor() 29 executor2 = Executor() 30 tid1 = await executor1.spawn(threading.get_ident) 31 tid2 = await executor2.spawn(threading.get_ident) 32 assert tid1 != tid2 33 34 """ 35 36 def __init__(self, 37 *args, 38 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor, # NOQA 39 log_exceptions: bool = True, 40 wait_futures: bool = True): 41 self._wait_futures = wait_futures 42 self._executor = executor_cls(*args) 43 self._loop = asyncio.get_running_loop() 44 self._async_group = Group(log_exceptions) 45 46 self.async_group.spawn(call_on_cancel, self._executor.shutdown, False) 47 48 @property 49 def async_group(self): 50 """Async group""" 51 return self._async_group 52 53 def spawn(self, 54 fn: Callable, 55 *args, **kwargs 56 ) -> asyncio.Task: 57 """Spawn new task""" 58 return self.async_group.spawn(self._spawn, fn, args, kwargs) 59 60 async def _spawn(self, fn, args, kwargs): 61 func = functools.partial(fn, *args, **kwargs) 62 coro = self._loop.run_in_executor(self._executor, func) 63 64 if self._wait_futures: 65 coro = uncancellable(coro) 66 67 return await coro
Executor wrapping asyncio.loop.run_in_executor
.
Wrapped executor is created from executor_cls
with provided args
.
If wait_futures
is True
, executor will be closed once all running
tasks finishes.
log_exceptions
is delegated to async_group
.
Arguments:
- args: executor args
- executor_cls: executor class
- log_exceptions: log exceptions
Example::
executor1 = Executor()
executor2 = Executor()
tid1 = await executor1.spawn(threading.get_ident)
tid2 = await executor2.spawn(threading.get_ident)
assert tid1 != tid2
36 def __init__(self, 37 *args, 38 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor, # NOQA 39 log_exceptions: bool = True, 40 wait_futures: bool = True): 41 self._wait_futures = wait_futures 42 self._executor = executor_cls(*args) 43 self._loop = asyncio.get_running_loop() 44 self._async_group = Group(log_exceptions) 45 46 self.async_group.spawn(call_on_cancel, self._executor.shutdown, False)
70def create_executor(*args, 71 executor_cls: type[concurrent.futures.Executor] = concurrent.futures.ThreadPoolExecutor, # NOQA 72 loop: asyncio.AbstractEventLoop | None = None 73 ) -> Callable[..., Awaitable]: 74 """Create `asyncio.loop.run_in_executor` wrapper. 75 76 Wrapped executor is created from `executor_cls` with provided `args`. 77 78 This function returns coroutine that takes a function and its arguments, 79 executes the function in executor and returns the result. 80 81 Args: 82 args: executor args 83 executor_cls: executor class 84 loop: asyncio loop 85 86 Returns: 87 executor coroutine 88 89 Example:: 90 91 executor1 = create_executor() 92 executor2 = create_executor() 93 tid1 = await executor1(threading.get_ident) 94 tid2 = await executor2(threading.get_ident) 95 assert tid1 != tid2 96 97 """ 98 executor = executor_cls(*args) 99 100 async def executor_wrapper(func, /, *args, **kwargs): 101 _loop = loop or asyncio.get_running_loop() 102 fn = functools.partial(func, *args, **kwargs) 103 return await _loop.run_in_executor(executor, fn) 104 105 return executor_wrapper
Create asyncio.loop.run_in_executor
wrapper.
Wrapped executor is created from executor_cls
with provided args
.
This function returns coroutine that takes a function and its arguments, executes the function in executor and returns the result.
Arguments:
- args: executor args
- executor_cls: executor class
- loop: asyncio loop
Returns:
executor coroutine
Example::
executor1 = create_executor()
executor2 = create_executor()
tid1 = await executor1(threading.get_ident)
tid2 = await executor2(threading.get_ident)
assert tid1 != tid2
13class Resource(abc.ABC): 14 """Resource with lifetime control based on `Group`.""" 15 16 async def __aenter__(self): 17 return self 18 19 async def __aexit__(self, *args): 20 await self.async_close() 21 22 @property 23 @abc.abstractmethod 24 def async_group(self) -> 'Group': 25 """Group controlling resource's lifetime.""" 26 27 @property 28 def is_open(self) -> bool: 29 """``True`` if not closing or closed, ``False`` otherwise.""" 30 return self.async_group.is_open 31 32 @property 33 def is_closing(self) -> bool: 34 """Is resource closing or closed.""" 35 return self.async_group.is_closing 36 37 @property 38 def is_closed(self) -> bool: 39 """Is resource closed.""" 40 return self.async_group.is_closed 41 42 async def wait_closing(self): 43 """Wait until closing is ``True``.""" 44 await self.async_group.wait_closing() 45 46 async def wait_closed(self): 47 """Wait until closed is ``True``.""" 48 await self.async_group.wait_closed() 49 50 def close(self): 51 """Close resource.""" 52 self.async_group.close() 53 54 async def async_close(self): 55 """Close resource and wait until closed is ``True``.""" 56 await self.async_group.async_close()
Resource with lifetime control based on Group
.
22 @property 23 @abc.abstractmethod 24 def async_group(self) -> 'Group': 25 """Group controlling resource's lifetime."""
Group controlling resource's lifetime.
27 @property 28 def is_open(self) -> bool: 29 """``True`` if not closing or closed, ``False`` otherwise.""" 30 return self.async_group.is_open
True
if not closing or closed, False
otherwise.
32 @property 33 def is_closing(self) -> bool: 34 """Is resource closing or closed.""" 35 return self.async_group.is_closing
Is resource closing or closed.
37 @property 38 def is_closed(self) -> bool: 39 """Is resource closed.""" 40 return self.async_group.is_closed
Is resource closed.
42 async def wait_closing(self): 43 """Wait until closing is ``True``.""" 44 await self.async_group.wait_closing()
Wait until closing is True
.
46 async def wait_closed(self): 47 """Wait until closed is ``True``.""" 48 await self.async_group.wait_closed()
Wait until closed is True
.
63class Group(Resource): 64 """Group of asyncio Tasks. 65 66 Group enables creation and management of related asyncio Tasks. The 67 Group ensures uninterrupted execution of Tasks and Task completion upon 68 Group closing. 69 70 Group can contain subgroups, which are independent Groups managed by the 71 parent Group. 72 73 If a Task raises exception, other Tasks continue to execute. 74 75 If `log_exceptions` is ``True``, exceptions raised by spawned tasks are 76 logged with level ERROR. 77 78 """ 79 80 def __init__(self, 81 log_exceptions: bool = True, 82 *, 83 loop: asyncio.AbstractEventLoop | None = None): 84 self._log_exceptions = log_exceptions 85 self._loop = loop or asyncio.get_running_loop() 86 self._closing = self._loop.create_future() 87 self._closed = self._loop.create_future() 88 self._tasks = set() 89 self._parent = None 90 self._children = set() 91 92 @property 93 def async_group(self): 94 """Async group""" 95 return self 96 97 @property 98 def is_open(self) -> bool: 99 """``True`` if group is not closing or closed, ``False`` otherwise.""" 100 return not self._closing.done() 101 102 @property 103 def is_closing(self) -> bool: 104 """Is group closing or closed.""" 105 return self._closing.done() 106 107 @property 108 def is_closed(self) -> bool: 109 """Is group closed.""" 110 return self._closed.done() 111 112 async def wait_closing(self): 113 """Wait until closing is ``True``.""" 114 await asyncio.shield(self._closing) 115 116 async def wait_closed(self): 117 """Wait until closed is ``True``.""" 118 await asyncio.shield(self._closed) 119 120 def create_subgroup(self, 121 log_exceptions: bool | None = None 122 ) -> 'Group': 123 """Create new Group as a child of this Group. Return the new Group. 124 125 When a parent Group gets closed, all of its children are closed. 126 Closing of a subgroup has no effect on the parent Group. 127 128 If `log_exceptions` is ``None``, subgroup inherits `log_exceptions` 129 from its parent. 130 131 """ 132 if self._closing.done(): 133 raise GroupClosedError("can't create subgroup of closed group") 134 135 child = Group( 136 log_exceptions=(self._log_exceptions if log_exceptions is None 137 else log_exceptions), 138 loop=self._loop) 139 child._parent = self 140 self._children.add(child) 141 return child 142 143 def wrap(self, 144 obj: Awaitable 145 ) -> asyncio.Task: 146 """Wrap the awaitable object into a Task and schedule its execution. 147 Return the Task object. 148 149 Resulting task is shielded and can be canceled only with 150 `Group.close`. 151 152 """ 153 if self._closing.done(): 154 raise GroupClosedError("can't wrap object in closed group") 155 156 if asyncio.iscoroutine(obj): 157 task = self._loop.create_task(obj) 158 159 else: 160 task = asyncio.ensure_future(obj, loop=self._loop) 161 162 self._tasks.add(task) 163 task.add_done_callback(self._on_task_done) 164 165 return asyncio.shield(task) 166 167 def spawn(self, 168 fn: Callable[..., Awaitable], 169 *args, **kwargs 170 ) -> asyncio.Task: 171 """Wrap the result of a `fn` into a Task and schedule its execution. 172 Return the Task object. 173 174 Function `fn` is called with provided `args` and `kwargs`. 175 Resulting Task is shielded and can be canceled only with 176 `Group.close`. 177 178 """ 179 if self._closing.done(): 180 raise GroupClosedError("can't spawn task in closed group") 181 182 future = fn(*args, **kwargs) 183 return self.wrap(future) 184 185 def close(self): 186 """Schedule Group closing. 187 188 Closing Future is set immediately. All subgroups are closed, and all 189 running tasks are canceled. Once closing of all subgroups 190 and execution of all tasks is completed, closed Future is set. 191 192 """ 193 if self._closing.done(): 194 return 195 196 self._closing.set_result(True) 197 198 for child in list(self._children): 199 child.close() 200 201 for task in self._tasks: 202 self._loop.call_soon(task.cancel) 203 204 futures = [*self._tasks, 205 *(child._closed for child in self._children)] 206 if futures: 207 waiting_task = self._loop.create_task(asyncio.wait(futures)) 208 waiting_task.add_done_callback(lambda _: self._on_closed()) 209 210 else: 211 self._on_closed() 212 213 async def async_close(self): 214 """Close Group and wait until closed is ``True``.""" 215 self.close() 216 await self.wait_closed() 217 218 async def __aenter__(self): 219 return self 220 221 async def __aexit__(self, *args): 222 await self.async_close() 223 224 def _on_closed(self): 225 if self._parent is not None: 226 self._parent._children.remove(self) 227 self._parent = None 228 229 self._closed.set_result(True) 230 231 def _on_task_done(self, task): 232 self._tasks.remove(task) 233 234 if task.cancelled(): 235 return 236 237 e = task.exception() 238 if e and self._log_exceptions: 239 mlog.error('unhandled exception in async group: %s', e, exc_info=e) 240 warnings.warn('unhandled exception in async group')
Group of asyncio Tasks.
Group enables creation and management of related asyncio Tasks. The Group ensures uninterrupted execution of Tasks and Task completion upon Group closing.
Group can contain subgroups, which are independent Groups managed by the parent Group.
If a Task raises exception, other Tasks continue to execute.
If log_exceptions
is True
, exceptions raised by spawned tasks are
logged with level ERROR.
80 def __init__(self, 81 log_exceptions: bool = True, 82 *, 83 loop: asyncio.AbstractEventLoop | None = None): 84 self._log_exceptions = log_exceptions 85 self._loop = loop or asyncio.get_running_loop() 86 self._closing = self._loop.create_future() 87 self._closed = self._loop.create_future() 88 self._tasks = set() 89 self._parent = None 90 self._children = set()
97 @property 98 def is_open(self) -> bool: 99 """``True`` if group is not closing or closed, ``False`` otherwise.""" 100 return not self._closing.done()
True
if group is not closing or closed, False
otherwise.
102 @property 103 def is_closing(self) -> bool: 104 """Is group closing or closed.""" 105 return self._closing.done()
Is group closing or closed.
107 @property 108 def is_closed(self) -> bool: 109 """Is group closed.""" 110 return self._closed.done()
Is group closed.
112 async def wait_closing(self): 113 """Wait until closing is ``True``.""" 114 await asyncio.shield(self._closing)
Wait until closing is True
.
116 async def wait_closed(self): 117 """Wait until closed is ``True``.""" 118 await asyncio.shield(self._closed)
Wait until closed is True
.
120 def create_subgroup(self, 121 log_exceptions: bool | None = None 122 ) -> 'Group': 123 """Create new Group as a child of this Group. Return the new Group. 124 125 When a parent Group gets closed, all of its children are closed. 126 Closing of a subgroup has no effect on the parent Group. 127 128 If `log_exceptions` is ``None``, subgroup inherits `log_exceptions` 129 from its parent. 130 131 """ 132 if self._closing.done(): 133 raise GroupClosedError("can't create subgroup of closed group") 134 135 child = Group( 136 log_exceptions=(self._log_exceptions if log_exceptions is None 137 else log_exceptions), 138 loop=self._loop) 139 child._parent = self 140 self._children.add(child) 141 return child
Create new Group as a child of this Group. Return the new Group.
When a parent Group gets closed, all of its children are closed. Closing of a subgroup has no effect on the parent Group.
If log_exceptions
is None
, subgroup inherits log_exceptions
from its parent.
143 def wrap(self, 144 obj: Awaitable 145 ) -> asyncio.Task: 146 """Wrap the awaitable object into a Task and schedule its execution. 147 Return the Task object. 148 149 Resulting task is shielded and can be canceled only with 150 `Group.close`. 151 152 """ 153 if self._closing.done(): 154 raise GroupClosedError("can't wrap object in closed group") 155 156 if asyncio.iscoroutine(obj): 157 task = self._loop.create_task(obj) 158 159 else: 160 task = asyncio.ensure_future(obj, loop=self._loop) 161 162 self._tasks.add(task) 163 task.add_done_callback(self._on_task_done) 164 165 return asyncio.shield(task)
Wrap the awaitable object into a Task and schedule its execution. Return the Task object.
Resulting task is shielded and can be canceled only with
Group.close
.
167 def spawn(self, 168 fn: Callable[..., Awaitable], 169 *args, **kwargs 170 ) -> asyncio.Task: 171 """Wrap the result of a `fn` into a Task and schedule its execution. 172 Return the Task object. 173 174 Function `fn` is called with provided `args` and `kwargs`. 175 Resulting Task is shielded and can be canceled only with 176 `Group.close`. 177 178 """ 179 if self._closing.done(): 180 raise GroupClosedError("can't spawn task in closed group") 181 182 future = fn(*args, **kwargs) 183 return self.wrap(future)
Wrap the result of a fn
into a Task and schedule its execution.
Return the Task object.
Function fn
is called with provided args
and kwargs
.
Resulting Task is shielded and can be canceled only with
Group.close
.
185 def close(self): 186 """Schedule Group closing. 187 188 Closing Future is set immediately. All subgroups are closed, and all 189 running tasks are canceled. Once closing of all subgroups 190 and execution of all tasks is completed, closed Future is set. 191 192 """ 193 if self._closing.done(): 194 return 195 196 self._closing.set_result(True) 197 198 for child in list(self._children): 199 child.close() 200 201 for task in self._tasks: 202 self._loop.call_soon(task.cancel) 203 204 futures = [*self._tasks, 205 *(child._closed for child in self._children)] 206 if futures: 207 waiting_task = self._loop.create_task(asyncio.wait(futures)) 208 waiting_task.add_done_callback(lambda _: self._on_closed()) 209 210 else: 211 self._on_closed()
Schedule Group closing.
Closing Future is set immediately. All subgroups are closed, and all running tasks are canceled. Once closing of all subgroups and execution of all tasks is completed, closed Future is set.
14async def first(xs: AsyncIterable[T], 15 fn: Callable[[T], typing.Any] = lambda _: True, 16 default: T | None = None 17 ) -> T | None: 18 """Return the first element from async iterable that satisfies 19 predicate `fn`, or `default` if no such element exists. 20 21 Result of predicate `fn` can be of any type. Predicate is satisfied if it's 22 return value is truthy. 23 24 Args: 25 xs: async collection 26 fn: predicate 27 default: default value 28 29 Example:: 30 31 async def async_range(x): 32 for i in range(x): 33 await asyncio.sleep(0) 34 yield i 35 36 assert await first(async_range(3)) == 0 37 assert await first(async_range(3), lambda x: x > 1) == 2 38 assert await first(async_range(3), lambda x: x > 2) is None 39 assert await first(async_range(3), lambda x: x > 2, 123) == 123 40 41 """ 42 async for i in xs: 43 if fn(i): 44 return i 45 46 return default
Return the first element from async iterable that satisfies
predicate fn
, or default
if no such element exists.
Result of predicate fn
can be of any type. Predicate is satisfied if it's
return value is truthy.
Arguments:
- xs: async collection
- fn: predicate
- default: default value
Example::
async def async_range(x):
for i in range(x):
await asyncio.sleep(0)
yield i
assert await first(async_range(3)) == 0
assert await first(async_range(3), lambda x: x > 1) == 2
assert await first(async_range(3), lambda x: x > 2) is None
assert await first(async_range(3), lambda x: x > 2, 123) == 123
49async def uncancellable(obj: Awaitable[T], 50 raise_cancel: bool = True 51 ) -> T: 52 """Uncancellable execution of a awaitable object. 53 54 Object is scheduled as task, shielded and its execution cannot be 55 interrupted. 56 57 If `raise_cancel` is `True` and the object gets canceled, 58 `asyncio.CancelledError` is reraised after the Future finishes. 59 60 Warning: 61 If `raise_cancel` is `False`, this method suppresses 62 `asyncio.CancelledError` and stops its propagation. Use with 63 caution. 64 65 Args: 66 obj: awaitable object 67 raise_cancel: raise CancelledError flag 68 69 Returns: 70 object execution result 71 72 """ 73 exception = None 74 loop = asyncio.get_running_loop() 75 76 if asyncio.iscoroutine(obj): 77 task = loop.create_task(obj) 78 79 else: 80 task = asyncio.ensure_future(obj, loop=loop) 81 82 while not task.done(): 83 try: 84 await asyncio.shield(task) 85 86 except asyncio.CancelledError as e: 87 if raise_cancel: 88 exception = e 89 90 except Exception: 91 pass 92 93 if exception: 94 raise exception 95 96 return task.result()
Uncancellable execution of a awaitable object.
Object is scheduled as task, shielded and its execution cannot be interrupted.
If raise_cancel
is True
and the object gets canceled,
asyncio.CancelledError
is reraised after the Future finishes.
Warning:
If
raise_cancel
isFalse
, this method suppressesasyncio.CancelledError
and stops its propagation. Use with caution.
Arguments:
- obj: awaitable object
- raise_cancel: raise CancelledError flag
Returns:
object execution result
113async def call(fn: AsyncCallable[..., T], *args, **kwargs) -> T: 114 """Call a function or a coroutine (or other callable object). 115 116 Call the `fn` with `args` and `kwargs`. If result of this call is 117 awaitable, it is awaited and returned. Otherwise, result is immediately 118 returned. 119 120 Args: 121 fn: callable object 122 args: additional positional arguments 123 kwargs: additional keyword arguments 124 125 Returns: 126 awaited result or result 127 128 Example: 129 130 def f1(x): 131 return x 132 133 def f2(x): 134 f = asyncio.Future() 135 f.set_result(x) 136 return f 137 138 async def f3(x): 139 return x 140 141 assert 'f1' == await hat.aio.call(f1, 'f1') 142 assert 'f2' == await hat.aio.call(f2, 'f2') 143 assert 'f3' == await hat.aio.call(f3, 'f3') 144 145 """ 146 result = fn(*args, **kwargs) 147 148 if inspect.isawaitable(result): 149 result = await result 150 151 return result
Call a function or a coroutine (or other callable object).
Call the fn
with args
and kwargs
. If result of this call is
awaitable, it is awaited and returned. Otherwise, result is immediately
returned.
Arguments:
- fn: callable object
- args: additional positional arguments
- kwargs: additional keyword arguments
Returns:
awaited result or result
Example:
def f1(x): return x
def f2(x): f = asyncio.Future() f.set_result(x) return f
async def f3(x): return x
assert 'f1' == await hat.aio.call(f1, 'f1') assert 'f2' == await hat.aio.call(f2, 'f2') assert 'f3' == await hat.aio.call(f3, 'f3')
154async def call_on_cancel(fn: AsyncCallable[..., T], *args, **kwargs) -> T: 155 """Call a function or a coroutine when canceled. 156 157 When canceled, `fn` is called with `args` and `kwargs` by using 158 `call` coroutine. 159 160 Args: 161 fn: function or coroutine 162 args: additional function arguments 163 kwargs: additional function keyword arguments 164 165 Returns: 166 function result 167 168 Example:: 169 170 f = asyncio.Future() 171 group = Group() 172 group.spawn(call_on_cancel, f.set_result, 123) 173 assert not f.done() 174 await group.async_close() 175 assert f.result() == 123 176 177 """ 178 with contextlib.suppress(asyncio.CancelledError): 179 await asyncio.get_running_loop().create_future() 180 181 return await call(fn, *args, *kwargs)
Call a function or a coroutine when canceled.
When canceled, fn
is called with args
and kwargs
by using
call
coroutine.
Arguments:
- fn: function or coroutine
- args: additional function arguments
- kwargs: additional function keyword arguments
Returns:
function result
Example::
f = asyncio.Future()
group = Group()
group.spawn(call_on_cancel, f.set_result, 123)
assert not f.done()
await group.async_close()
assert f.result() == 123
184async def call_on_done(f: Awaitable, 185 fn: AsyncCallable[..., T], 186 *args, **kwargs 187 ) -> T: 188 """Call a function or a coroutine when awaitable is done. 189 190 When `f` is done, `fn` is called with `args` and `kwargs` by using 191 `call` coroutine. 192 193 If this coroutine is canceled before `f` is done, `f` is canceled and `fn` 194 is not called. 195 196 If this coroutine is canceled after `f` is done, `fn` call is canceled. 197 198 Args: 199 f: awaitable future 200 fn: function or coroutine 201 args: additional function arguments 202 kwargs: additional function keyword arguments 203 204 Returns: 205 function result 206 207 Example:: 208 209 f = asyncio.Future() 210 group = Group() 211 group.spawn(call_on_done, f, group.close) 212 assert group.is_open 213 f.set_result(None) 214 await group.wait_closed() 215 assert group.is_closed 216 217 """ 218 with contextlib.suppress(Exception): 219 await f 220 221 return await call(fn, *args, *kwargs)
Call a function or a coroutine when awaitable is done.
When f
is done, fn
is called with args
and kwargs
by using
call
coroutine.
If this coroutine is canceled before f
is done, f
is canceled and fn
is not called.
If this coroutine is canceled after f
is done, fn
call is canceled.
Arguments:
- f: awaitable future
- fn: function or coroutine
- args: additional function arguments
- kwargs: additional function keyword arguments
Returns:
function result
Example::
f = asyncio.Future()
group = Group()
group.spawn(call_on_done, f, group.close)
assert group.is_open
f.set_result(None)
await group.wait_closed()
assert group.is_closed
224def init_asyncio(policy: asyncio.AbstractEventLoopPolicy | None = None): 225 """Initialize asyncio. 226 227 Sets event loop policy (if ``None``, instance of 228 `asyncio.DefaultEventLoopPolicy` is used). 229 230 On Windows, `asyncio.WindowsProactorEventLoopPolicy` is used as default 231 policy. 232 233 """ 234 235 def get_default_policy(): 236 if sys.platform == 'win32': 237 return asyncio.WindowsProactorEventLoopPolicy() 238 239 # TODO: evaluate usage of uvloop 240 # with contextlib.suppress(ModuleNotFoundError): 241 # import uvloop 242 # return uvloop.EventLoopPolicy() 243 244 return asyncio.DefaultEventLoopPolicy() 245 246 asyncio.set_event_loop_policy(policy or get_default_policy())
Initialize asyncio.
Sets event loop policy (if None
, instance of
asyncio.DefaultEventLoopPolicy
is used).
On Windows, asyncio.WindowsProactorEventLoopPolicy
is used as default
policy.
249def run_asyncio(future: Awaitable[T], *, 250 handle_signals: bool = True, 251 loop: asyncio.AbstractEventLoop | None = None 252 ) -> T: 253 """Run asyncio loop until the `future` is completed and return the result. 254 255 If `handle_signals` is ``True``, SIGINT and SIGTERM handlers are 256 temporarily overridden. Instead of raising ``KeyboardInterrupt`` on every 257 signal reception, Future is canceled only once. Additional signals are 258 ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also 259 overridden. 260 261 If `loop` is set to ``None``, new event loop is created and set 262 as thread's default event loop. Newly created loop is closed when this 263 coroutine returns. Running tasks or async generators, other than provided 264 `future`, are not canceled prior to loop closing. 265 266 On Windows, asyncio loop gets periodically woken up (every 0.5 seconds). 267 268 Args: 269 future: future or coroutine 270 handle_signals: handle signals flag 271 loop: event loop 272 273 Returns: 274 future's result 275 276 Example:: 277 278 async def run(): 279 await asyncio.sleep(0) 280 return 123 281 282 result = run_asyncio(run()) 283 assert result == 123 284 285 """ 286 close_loop = loop is None 287 if loop is None: 288 loop = asyncio.new_event_loop() 289 asyncio.set_event_loop(loop) 290 291 task = asyncio.ensure_future(future, loop=loop) 292 293 if sys.platform == 'win32': 294 295 async def task_wrapper(task): 296 try: 297 while not task.done(): 298 await asyncio.wait([task], timeout=0.5) 299 except asyncio.CancelledError: 300 task.cancel() 301 return await task 302 303 task = asyncio.ensure_future(task_wrapper(task), loop=loop) 304 305 if not handle_signals: 306 return loop.run_until_complete(task) 307 308 canceled = False 309 signalnums = [signal.SIGINT, signal.SIGTERM] 310 if sys.platform == 'win32': 311 signalnums += [signal.SIGBREAK] 312 313 def signal_handler(*args): 314 nonlocal canceled 315 if canceled: 316 return 317 loop.call_soon_threadsafe(task.cancel) 318 canceled = True 319 320 handlers = {signalnum: signal.getsignal(signalnum) or signal.SIG_DFL 321 for signalnum in signalnums} 322 for signalnum in signalnums: 323 signal.signal(signalnum, signal_handler) 324 325 try: 326 return loop.run_until_complete(task) 327 328 finally: 329 for signalnum, handler in handlers.items(): 330 signal.signal(signalnum, handler) 331 if close_loop: 332 loop.close()
Run asyncio loop until the future
is completed and return the result.
If handle_signals
is True
, SIGINT and SIGTERM handlers are
temporarily overridden. Instead of raising KeyboardInterrupt
on every
signal reception, Future is canceled only once. Additional signals are
ignored. On Windows, SIGBREAK (CTRL_BREAK_EVENT) handler is also
overridden.
If loop
is set to None
, new event loop is created and set
as thread's default event loop. Newly created loop is closed when this
coroutine returns. Running tasks or async generators, other than provided
future
, are not canceled prior to loop closing.
On Windows, asyncio loop gets periodically woken up (every 0.5 seconds).
Arguments:
- future: future or coroutine
- handle_signals: handle signals flag
- loop: event loop
Returns:
future's result
Example::
async def run():
await asyncio.sleep(0)
return 123
result = run_asyncio(run())
assert result == 123
Raised when trying to use a closed queue.
Raised if queue is empty.
Raised if queue is full.
23class Queue(typing.Generic[T]): 24 """Asyncio queue which implements AsyncIterable and can be closed. 25 26 Interface and implementation are based on `asyncio.Queue`. 27 28 If `maxsize` is less than or equal to zero, the queue size is infinite. 29 30 Args: 31 maxsize: maximum number of items in the queue 32 33 Example:: 34 35 queue = Queue(maxsize=1) 36 37 async def producer(): 38 for i in range(4): 39 await queue.put(i) 40 queue.close() 41 42 async def consumer(): 43 result = 0 44 async for i in queue: 45 result += i 46 return result 47 48 asyncio.ensure_future(producer()) 49 result = await consumer() 50 assert result == 6 51 52 """ 53 54 def __init__(self, 55 maxsize: int = 0): 56 self._maxsize = maxsize 57 self._queue = collections.deque() 58 self._getters = collections.deque() 59 self._putters = collections.deque() 60 self._closed = False 61 62 def __aiter__(self): 63 return self 64 65 async def __anext__(self): 66 try: 67 return await self.get() 68 69 except QueueClosedError: 70 raise StopAsyncIteration 71 72 def __str__(self): 73 return (f'<{type(self).__name__}' 74 f' _closed={self._closed} ' 75 f' _queue={list(self._queue)}>') 76 77 def __len__(self): 78 return len(self._queue) 79 80 @property 81 def maxsize(self) -> int: 82 """Maximum number of items in the queue.""" 83 return self._maxsize 84 85 @property 86 def is_closed(self) -> bool: 87 """Is queue closed.""" 88 return self._closed 89 90 def empty(self) -> bool: 91 """``True`` if queue is empty, ``False`` otherwise.""" 92 return not self._queue 93 94 def full(self) -> bool: 95 """``True`` if queue is full, ``False`` otherwise.""" 96 if self._maxsize > 0: 97 return len(self._queue) >= self._maxsize 98 99 return False 100 101 def qsize(self) -> int: 102 """Number of items currently in the queue.""" 103 return len(self._queue) 104 105 def close(self): 106 """Close the queue.""" 107 if self._closed: 108 return 109 110 self._closed = True 111 self._wakeup_all(self._putters) 112 self._wakeup_next(self._getters) 113 114 def get_nowait(self) -> T: 115 """Return an item if one is immediately available, else raise 116 `QueueEmptyError`. 117 118 Raises: 119 QueueEmptyError 120 121 """ 122 if self.empty(): 123 raise QueueEmptyError() 124 125 item = self._queue.popleft() 126 self._wakeup_next(self._putters) 127 return item 128 129 def put_nowait(self, item: T): 130 """Put an item into the queue without blocking. 131 132 If no free slot is immediately available, raise `QueueFullError`. 133 134 Raises: 135 QueueFullError 136 137 """ 138 if self._closed: 139 raise QueueClosedError() 140 141 if self.full(): 142 raise QueueFullError() 143 144 self._queue.append(item) 145 self._wakeup_next(self._getters) 146 147 async def get(self) -> T: 148 """Remove and return an item from the queue. 149 150 If queue is empty, wait until an item is available. 151 152 Raises: 153 QueueClosedError 154 155 """ 156 loop = asyncio.get_running_loop() 157 158 while self.empty(): 159 if self._closed: 160 self._wakeup_all(self._getters) 161 raise QueueClosedError() 162 163 getter = loop.create_future() 164 self._getters.append(getter) 165 166 try: 167 await getter 168 169 except BaseException: 170 getter.cancel() 171 172 with contextlib.suppress(ValueError): 173 self._getters.remove(getter) 174 175 if not getter.cancelled(): 176 if not self.empty() or self._closed: 177 self._wakeup_next(self._getters) 178 179 raise 180 181 return self.get_nowait() 182 183 async def put(self, item: T): 184 """Put an item into the queue. 185 186 If the queue is full, wait until a free slot is available before adding 187 the item. 188 189 Raises: 190 QueueClosedError 191 192 """ 193 loop = asyncio.get_running_loop() 194 195 while not self._closed and self.full(): 196 putter = loop.create_future() 197 self._putters.append(putter) 198 199 try: 200 await putter 201 202 except BaseException: 203 putter.cancel() 204 205 with contextlib.suppress(ValueError): 206 self._putters.remove(putter) 207 208 if not self.full() and not putter.cancelled(): 209 self._wakeup_next(self._putters) 210 211 raise 212 213 return self.put_nowait(item) 214 215 async def get_until_empty(self) -> T: 216 """Empty the queue and return the last item. 217 218 If queue is empty, wait until at least one item is available. 219 220 Raises: 221 QueueClosedError 222 223 """ 224 item = await self.get() 225 226 while not self.empty(): 227 item = self.get_nowait() 228 229 return item 230 231 def get_nowait_until_empty(self) -> T: 232 """Empty the queue and return the last item if at least one 233 item is immediately available, else raise `QueueEmptyError`. 234 235 Raises: 236 QueueEmptyError 237 238 """ 239 item = self.get_nowait() 240 241 while not self.empty(): 242 item = self.get_nowait() 243 244 return item 245 246 def _wakeup_next(self, waiters): 247 while waiters: 248 waiter = waiters.popleft() 249 250 if not waiter.done(): 251 waiter.set_result(None) 252 break 253 254 def _wakeup_all(self, waiters): 255 while waiters: 256 waiter = waiters.popleft() 257 258 if not waiter.done(): 259 waiter.set_result(None)
Asyncio queue which implements AsyncIterable and can be closed.
Interface and implementation are based on asyncio.Queue
.
If maxsize
is less than or equal to zero, the queue size is infinite.
Arguments:
- maxsize: maximum number of items in the queue
Example::
queue = Queue(maxsize=1)
async def producer():
for i in range(4):
await queue.put(i)
queue.close()
async def consumer():
result = 0
async for i in queue:
result += i
return result
asyncio.ensure_future(producer())
result = await consumer()
assert result == 6
80 @property 81 def maxsize(self) -> int: 82 """Maximum number of items in the queue.""" 83 return self._maxsize
Maximum number of items in the queue.
90 def empty(self) -> bool: 91 """``True`` if queue is empty, ``False`` otherwise.""" 92 return not self._queue
True
if queue is empty, False
otherwise.
94 def full(self) -> bool: 95 """``True`` if queue is full, ``False`` otherwise.""" 96 if self._maxsize > 0: 97 return len(self._queue) >= self._maxsize 98 99 return False
True
if queue is full, False
otherwise.
101 def qsize(self) -> int: 102 """Number of items currently in the queue.""" 103 return len(self._queue)
Number of items currently in the queue.
105 def close(self): 106 """Close the queue.""" 107 if self._closed: 108 return 109 110 self._closed = True 111 self._wakeup_all(self._putters) 112 self._wakeup_next(self._getters)
Close the queue.
114 def get_nowait(self) -> T: 115 """Return an item if one is immediately available, else raise 116 `QueueEmptyError`. 117 118 Raises: 119 QueueEmptyError 120 121 """ 122 if self.empty(): 123 raise QueueEmptyError() 124 125 item = self._queue.popleft() 126 self._wakeup_next(self._putters) 127 return item
129 def put_nowait(self, item: T): 130 """Put an item into the queue without blocking. 131 132 If no free slot is immediately available, raise `QueueFullError`. 133 134 Raises: 135 QueueFullError 136 137 """ 138 if self._closed: 139 raise QueueClosedError() 140 141 if self.full(): 142 raise QueueFullError() 143 144 self._queue.append(item) 145 self._wakeup_next(self._getters)
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFullError
.
Raises:
- QueueFullError
147 async def get(self) -> T: 148 """Remove and return an item from the queue. 149 150 If queue is empty, wait until an item is available. 151 152 Raises: 153 QueueClosedError 154 155 """ 156 loop = asyncio.get_running_loop() 157 158 while self.empty(): 159 if self._closed: 160 self._wakeup_all(self._getters) 161 raise QueueClosedError() 162 163 getter = loop.create_future() 164 self._getters.append(getter) 165 166 try: 167 await getter 168 169 except BaseException: 170 getter.cancel() 171 172 with contextlib.suppress(ValueError): 173 self._getters.remove(getter) 174 175 if not getter.cancelled(): 176 if not self.empty() or self._closed: 177 self._wakeup_next(self._getters) 178 179 raise 180 181 return self.get_nowait()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
Raises:
- QueueClosedError
183 async def put(self, item: T): 184 """Put an item into the queue. 185 186 If the queue is full, wait until a free slot is available before adding 187 the item. 188 189 Raises: 190 QueueClosedError 191 192 """ 193 loop = asyncio.get_running_loop() 194 195 while not self._closed and self.full(): 196 putter = loop.create_future() 197 self._putters.append(putter) 198 199 try: 200 await putter 201 202 except BaseException: 203 putter.cancel() 204 205 with contextlib.suppress(ValueError): 206 self._putters.remove(putter) 207 208 if not self.full() and not putter.cancelled(): 209 self._wakeup_next(self._putters) 210 211 raise 212 213 return self.put_nowait(item)
Put an item into the queue.
If the queue is full, wait until a free slot is available before adding the item.
Raises:
- QueueClosedError
215 async def get_until_empty(self) -> T: 216 """Empty the queue and return the last item. 217 218 If queue is empty, wait until at least one item is available. 219 220 Raises: 221 QueueClosedError 222 223 """ 224 item = await self.get() 225 226 while not self.empty(): 227 item = self.get_nowait() 228 229 return item
Empty the queue and return the last item.
If queue is empty, wait until at least one item is available.
Raises:
- QueueClosedError
231 def get_nowait_until_empty(self) -> T: 232 """Empty the queue and return the last item if at least one 233 item is immediately available, else raise `QueueEmptyError`. 234 235 Raises: 236 QueueEmptyError 237 238 """ 239 item = self.get_nowait() 240 241 while not self.empty(): 242 item = self.get_nowait() 243 244 return item
Empty the queue and return the last item if at least one
item is immediately available, else raise QueueEmptyError
.
Raises:
- QueueEmptyError
13class CancelledWithResultError(asyncio.CancelledError): 14 """CancelledError with associated result or exception""" 15 16 def __init__(self, 17 result: typing.Any | None, 18 exception: BaseException | None): 19 super().__init__() 20 self.__result = result 21 self.__exception = exception 22 23 @property 24 def result(self) -> typing.Any | None: 25 """Result""" 26 return self.__result 27 28 @property 29 def exception(self) -> BaseException | None: 30 return self.__exception
CancelledError with associated result or exception
33async def wait_for(obj: Awaitable[T], 34 timeout: float 35 ) -> T: 36 """"Wait for the awaitable object to complete, with timeout. 37 38 Implementation `asyncio.wait_for` that ensure propagation of 39 CancelledError. 40 41 If task is cancelled with objects's result available, instead of 42 returning result, this implementation raises `CancelledWithResultError`. 43 44 """ 45 group = Group(log_exceptions=False) 46 group.spawn(call_on_done, asyncio.sleep(timeout), group.close) 47 48 task = group.wrap(obj) 49 group.spawn(call_on_done, asyncio.shield(task), group.close) 50 51 exc = None 52 53 try: 54 await group.wait_closing() 55 56 except asyncio.CancelledError as e: 57 exc = e 58 59 try: 60 await uncancellable(group.async_close()) 61 62 except asyncio.CancelledError as e: 63 exc = e 64 65 if exc: 66 if task.cancelled(): 67 raise exc 68 69 result = None if task.exception() else task.result() 70 raise CancelledWithResultError(result, task.exception()) from exc 71 72 if task.cancelled(): 73 raise asyncio.TimeoutError() 74 75 return task.result()
"Wait for the awaitable object to complete, with timeout.
Implementation asyncio.wait_for
that ensure propagation of
CancelledError.
If task is cancelled with objects's result available, instead of
returning result, this implementation raises CancelledWithResultError
.