hat.aio - Python asyncio utility library¶
Additional functions/coroutines that enhance Python asyncio library. For more information on asyncio see Python standard library documentation.
hat.aio.first¶
Same as hat.util.first where Iterable is replaced with AsyncIterable:
async def first(xs: typing.AsyncIterable[T],
fn: typing.Callable[[T], typing.Any] = lambda _: True,
default: typing.Optional[T] = None
) -> typing.Optional[T]: ...
This coroutine can be used on any kind of async iterable including hat.aio.Queue:
queue = Queue()
queue.put_nowait(1)
queue.put_nowait(2)
queue.put_nowait(3)
queue.close()
assert 1 == await first(queue)
assert 3 == await first(queue, lambda x: x > 2)
assert 123 == await first(queue, default=123)
hat.aio.uncancellable¶
This coroutine provides enhancement of asyncio.shield mechanism.
asyncio.shield provides canceling protection for shielded future but
interrupts execution of task which awaited for shielded future. By using
uncancellable, future can be shielded from cancellation with addition
of suspending cancellation of task awaiting shielded future. This cancellation
is suspended until shielded future is done (if raise_cancel is True
) or
is completely ignored (if raise_cancel is False
). Calling
uncancellable with raise_cancel set to False
should be used with extra
caution because it disrupts usual asyncio.CancelledError propagation.
async def uncancellable(f: asyncio.Future,
raise_cancel: bool = True
) -> typing.Any: ...
Most significant usage of this coroutine is in scenarios where cancellation of task should be temporary suspended until internally acquired resources are released:
async def create_resource(): ...
async def consume_resource(resource): ...
async def free_resource(resource): ...
async def run():
resource = await create_resource()
try:
await consume_resource(resource)
finally:
await uncancellable(free_resource(resource))
hat.aio.call, hat.aio.call_on_cancel and hat.aio.call_on_done¶
call, call_on_cancel and call_on_done provide the same mechanism for function and coroutine calling:
async def call(fn: AsyncCallable, *args, **kwargs) -> typing.Any: ...
async def call_on_cancel(fn: AsyncCallable, *args, **kwargs) -> typing.Any: ...
async def call_on_done(f: typing.Awaitable,
fn: AsyncCallable,
*args, **kwargs
) -> typing.Any: ...
When regular functions are called by call coroutine, result of function call is immediately available as result of call coroutine. If function call results in awaitable object (e.g. when coroutine is called in place of regular function), resulting awaitable is awaited and its result is returned as result of call coroutine:
def f1(x):
return x
async def f2(x):
await asyncio.sleep(0)
return x
result = await call(f1, 123)
assert result == 123
result = await call(f2, 123)
assert result == 123
call_on_cancel coroutine waits until its execution is cancelled (until asyncio.CancelledError is raised) and then executes provided callable with call coroutine. This behavior is most useful in combination with hat.aio.Goup’s spawn method:
f = asyncio.Future()
group = Group()
group.spawn(call_on_cancel, f.set_result, 123)
await group.async_close()
assert f.result() == 123
call_on_done coroutine accepts additional future which is awaited prior to application of call coroutine. Same as call_on_cancel, it is usually used with hat.aio.Goup’s spawn method:
f = asyncio.Future()
group = Group()
group.spawn(call_on_done, f, group.close)
f.set_result(None)
await group.wait_closed()
hat.aio.wait_for¶
Drop-in replacement for asyncio.wait_for that ensure propagation of CancelledError. If task is cancelled with objects’s result available, instead of returning result, this implementation raises CancelledWithResultError:
class CancelledWithResultError(asyncio.CancelledError):
@property
def result(self) -> typing.Optional[typing.Any]: ...
@property
def exception(self) -> typing.Optional[BaseException]: ...
async def wait_for(obj: typing.Awaitable,
timeout: float
) -> typing.Any: ...
hat.aio.init_asyncio and hat.aio.run_asyncio¶
Utility coroutines for initialization of asyncio and task execution:
def init_asyncio(policy: typing.Optional[asyncio.AbstractEventLoopPolicy] = None): ...
def run_asyncio(future: typing.Awaitable, *,
handle_signals=True,
create_loop=False
) -> typing.Any: ...
Example usage:
def main():
init_asyncio()
result = run_asyncio(async_main())
assert result == 123
async def async_main():
await asyncio.sleep(0)
return 123
if __name__ == '__main__':
main()
hat.aio.Queue¶
hat.aio.Queue provides drop-in replacement for asyncio.Queue with addition of close method. Once queue is closed, all future calls to put methods will result in raising of QueueClosedError. Once queue is closed and empty, all future calls to get methods will also result in raising of QueueClosedError.
class QueueClosedError(Exception): ...
class QueueEmptyError(Exception): ...
class QueueFullError(Exception): ...
class Queue:
def __init__(self, maxsize: int = 0): ...
def __aiter__(self): ...
async def __anext__(self): ...
def __str__(self): ...
def __len__(self): ...
@property
def maxsize(self) -> int: ...
@property
def is_closed(self) -> bool: ...
def empty(self) -> bool: ...
def full(self) -> bool: ...
def qsize(self) -> int: ...
def close(self): ...
def get_nowait(self) -> typing.Any: ...
def put_nowait(self, item: typing.Any): ...
async def get(self) -> typing.Any: ...
async def put(self, item: typing.Any): ...
async def get_until_empty(self) -> typing.Any: ...
Example usage:
queue = Queue(maxsize=1)
async def producer():
for i in range(4):
await queue.put(i)
queue.close()
async def consumer():
result = 0
async for i in queue:
result += i
return result
asyncio.ensure_future(producer())
result = await consumer()
assert result == 6
hat.aio.Group¶
Group provides mechanics for safe task execution and life-time control:
class Group(Resource):
def __init__(self,
log_exceptions: bool = True,
*,
loop: typing.Optional[asyncio.AbstractEventLoop] = None): ...
@property
def is_open(self) -> bool: ...
@property
def is_closing(self) -> bool: ...
@property
def is_closed(self) -> bool: ...
async def wait_closing(self): ...
async def wait_closed(self): ...
def create_subgroup(self,
log_exceptions: typing.Optional[bool] = None
) -> 'Group': ...
def wrap(self,
future: asyncio.Future
) -> asyncio.Task: ...
def spawn(self,
fn: typing.Callable[..., typing.Awaitable],
*args, **kwargs
) -> asyncio.Task: ...
def close(self): ...
async def async_close(self): ...
async def __aenter__(self): ...
async def __aexit__(self, *args): ...
In most basic use-case, Group’s spawn method can be used as safer wrapper for asyncio.ensure_future:
async def f1(x):
try:
await asyncio.Future()
except asyncio.CancelledError:
return x
async def f2(x):
await asyncio.sleep(0)
return x
async with Group() as group:
f = group.spawn(f1, 'f1')
assert 'f2' == await group.spawn(f2, 'f2')
assert 'f1' == await f
Group’s create_subgroup method provides possibility of group hierarchies for easier control of complex task execution:
group = aio.Group()
subgroup1 = group.create_subgroup()
subgroup2 = group.create_subgroup()
f1 = subgroup1.spawn(asyncio.Future)
f2 = subgroup2.spawn(asyncio.Future)
assert not f1.done()
assert not f2.done()
await group.async_close()
assert f1.done()
assert f2.done()
hat.aio.Resource¶
Simple abstract base class providing abstraction of lifetime control based on hat.aio.Group. Lifetime states of resource (is_open, is_closing and is_closed) are matching to associated group states:
class Resource(abc.ABC):
@property
@abc.abstractmethod
def async_group(self) -> Group: ...
@property
def is_open(self) -> bool: ...
@property
def is_closing(self) -> bool: ...
@property
def is_closed(self) -> bool: ...
async def wait_closing(self): ...
async def wait_closed(self): ...
def close(self): ...
async def async_close(self): ...
hat.aio.Executor¶
This class provides simple wrapper for creation of executor instances and invocation of asyncio.loop.run_in_executor coroutine:
class Executor(Resource):
def __init__(self,
*args: typing.Any,
executor_cls: typing.Type = concurrent.futures.ThreadPoolExecutor, # NOQA
log_exceptions: bool = True): ...
def spawn(self,
fn: typing.Callable,
*args, **kwargs
) -> asyncio.Task: ...
Example usage:
executor1 = Executor()
executor2 = Executor()
tid1 = await executor1.spawn(threading.get_ident)
tid2 = await executor2.spawn(threading.get_ident)
assert tid1 != tid2
API¶
API reference is available as part of generated documentation: