Асинхронные возможности Python
Асинхронность:
- Прямая (параллелизм) — в языке нет
- С последовательной активацией (обратные вызовы функций, callbacks) — полностью моделируется имеющимся синтаксисом
- Сопрограммная (внутри сопрограммы есть синхронные участки, выполнение их произвольно) — вот!
Модель
Предполагается, что весь предлагаемый код вы запускаете и смотрите на результат; без этого понять намного сложнее, if even possible ☺
Как работает yield from (повторение)
На время yield from код генератора task() логически не исполняется, можно считать, что на это время его замещает subr()
Ловля return из генератора с помощью yield from (два способа)
Оператор return в генераторе откладывает свой параметр в поле .value исключения StopIteration
А в конструкции yield from … это значение приезжает прямо так!
Передача параметра в генератор с помощью .send() (повторение):
Особенность: самый первый .send() должен быть генератор.send(None) (или, что то же самое, next(генератор), потому что в синтаксисе нет способа передать какое-то значение в начало генератора, а не в yield.
initial — это параметр генератор-функции, он передаётся в момент создания генератора, а не при его проходе
Мы договорились считать этот первый next() запуском генератора.
Куда происходит .send() в случае yield from?
Ничего неожиданного: .send() попадает в тот итератор, который сейчас yield-ит
Внимательно посмотрим, куда что send-илось…
Асинхронность как произвольное исполнение частей кода между yield-ами
Понятие синхронного фрагмента — непрерывно выполняемого кода между yield-ами (а также стартом и return-ом)
Понятие образующего цикла (main loop)
Тот же пример, но с двумя асинхронно выполняющимися задачами:
1 def subr(n): 2 x = yield f"({n}) Wait for x" 3 y = yield f"({n}) Wait for y ({x=})" 4 return x * y 5 6 def task(n): 7 while True: 8 value = yield from subr(n) 9 _ = yield f"[{n}]: {value}" 10 11 cores = task(0), task(1) 12 print(next(cores[0]), next(cores[1]), sep="\n") 13 for i in range(20): 14 print(cores[not i % 3].send(i))
Здесь из образующего цикла поступает поток целых чисел, subr() их попарно умножает, а две задачи складывают эти произведения
Очередной число попадает в subr() выбранной задачи, а выбор задач делает образующий цикл
Синхронные фрагменты из task[0] выполняются в два раза чаще синхронных фрагментов из task[1]
- Можно попробовать разобраться, что с чем складывалось…
- Более сложный пример: три конечных задачи с разным количеством синхронных фрагментов
1 from random import randint 2 3 def subr(): 4 x = yield 5 y = yield 6 return x * y 7 8 def task(num): 9 res = 0 10 for i in range(num): 11 res += yield from subr() 12 return res 13 14 def loop(*tasks): 15 queue, result = list(tasks), [] 16 print("Start:", *queue, sep="\n\t") 17 for task in tasks: 18 next(task) 19 while queue: 20 task = queue.pop(0) 21 try: 22 task.send(randint(1, 9)) 23 except StopIteration as ret: 24 result.append((task, ret.value)) 25 else: 26 queue.append(task) 27 return result 28 29 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
- Образующий цикл вынесен в отдельную функцию и стал сложнее. В нём генерируется непрерывный поток случайных целых и отдаётся поштучно на обработку очередному заданию. Если задание закончилось, запоминается его результат, а если нет — ставится в конец очереди.
Для реализации этой логики пришлось снова «вытащить» явную обработку StopIteration
Значения, возвращаемые yield, при этом не используются вообще: yield служит только для разметки синхронных фрагментов
Если ещё усложнить логику образующего цикла, мы сможем управлять его поведением с помощью возвращаемых yield значений:
1 from random import randint, choice 2 from string import ascii_uppercase 3 from collections import deque 4 5 def subr(): 6 return (yield int) * (yield str) 7 8 def task(num): 9 res = "" 10 for i in range(num): 11 res += yield from subr() 12 return res 13 14 def loop(*tasks): 15 queue, result = deque((task, None) for task in tasks), [] 16 print("Start:", *queue, sep="\n\t") 17 while queue: 18 task, request = queue.popleft() 19 if request is int: 20 data = randint(1, 4) 21 elif request is str: 22 data = choice(ascii_uppercase) 23 else: 24 data = request 25 try: 26 request = task.send(data) 27 except StopIteration as ret: 28 result.append((task, ret.value)) 29 task.close() 30 else: 31 queue.append((task, request)) 32 return result 33 34 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
subr() возвращает тип параметра, который она хотела бы получить в следующем yield-е
Этот тип хранится в очереди вместо с заданием, чей subr() запросил данный параметр
- Образующий цикл генерирует параметр сообразно типу
А ещё мы храним очередь в очереди, а не в списке, не надо привыкать к плохому!
Можно и дальше усложнять, но и так уже непросто!
Ещё модели
Цикл событий: образующий цикл получает откуда-то «события», определяет, кто их должен обрабатывать и вызывает функции-обработчики с параметром обработчик(событие) (возможно, не функции, а генераторы облаботчик.send(событие), не слишком важно).
- Цикл обратных вызовов (callback-ов): частный случай того же самого: каждый обработчик «регистрируется» — по заранее определённому протоколу указывает, в каких случаях его надо вызывать (это и есть событие), а образующий цикл при наступлении события вызывает все обработчики, которые на нём зарегистрировались (опять-таки, можно организовать в виде функций, а можно в виде генераторов)
Цикл с future: унификация управления образующим циклом
future — это генератор из двух синхронных сегментов
Настройка и yield в образующий цикл
return возвращаемого значения
Кроме того, в future есть поле готовности / результата
- Алгоритм работы:
- Генератор-сервис заводит неготовую фьючу в данном образующем цикле
Генератор-пользователь делает yield from фьюча
- Фьюча выпадает в образующий цикл
- В какой-то момент генератор-сервис выставляет в фьюче готовность / результат
- На этом основании образующий цикл возвращает управление фьюче (во второй сегмент)
- А та возвращает значение генератору-пользователю
- Фактически это частный случай обратных вызовов
- … более сложная логика (например, приоритизация событий) …
Синтаксис Async
async def + return ≈ генератор
await ≈ yield from
@types.coroutine: низкоуровневая сопрограмма, которая может делать и return значение, и yield
async def + yield — это именно то, чем кажется: генераторы, про которые сразу известно, что они асинхронные:
Их можно проходить async for (причём в конструкторах вида [… async for i in асинхронный-гененратор …] тоже)
Перепишем предыдущий пример на async
1 from random import randint, choice 2 from string import ascii_uppercase 3 from types import coroutine 4 from collections import deque 5 6 @coroutine 7 def subr(): 8 return (yield int) * (yield str) 9 10 async def task(num): 11 res = "" 12 for i in range(num): 13 res += await subr() 14 return res 15 16 def loop(*tasks): 17 queue, result = deque((task, None) for task in tasks), [] 18 print("Start:", *queue, sep="\n\t") 19 while queue: 20 task, request = queue.popleft() 21 if request is int: 22 data = randint(1, 4) 23 elif request is str: 24 data = choice(ascii_uppercase) 25 else: 26 data = request 27 try: 28 request = task.send(data) 29 except StopIteration as ret: 30 result.append((task, ret.value)) 31 task.close() 32 else: 33 queue.append((task, request)) 34 return result 35 36 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
Наш subr() использует прямое управление образующим циклом с помощью yield
В готовом инструментарии это практически никогда не нужно: и образующий цикл, и инструменты управления им должны входить в такой инструментарий
Asyncio
- Самое сложное — это логика образующего цикла
Самое ненужное — это логика образующего цикла (достаточно знать, как он работает, а не что делает)
⇒
- Запрограммируем образующий цикл заранее, насуём туда инструментов
Упростим протокол управления до одного понятия — Future
- Обмажем протокол верхним уровнем (задания, события, очереди и т. п.)
До такой степени, что ни одна из наших сопрограмм не делает yield (если это не асинхронный генератор)
(asyncio specific) обмажем огромным количеством применений IRL
Основные понятия:
Mainloop — образующий цикл. Полностью под капотом, мы его не видим.
Task — асинхронное задание
1 import asyncio 2 from time import strftime 3 4 async def late(delay, msg): 5 await asyncio.sleep(delay) 6 print(msg) 7 8 async def main(): 9 print(f"> {strftime('%X')}") 10 await late(1, "One") 11 print(f"> {strftime('%X')}") 12 await late(2, "Two") 13 print(f"> {strftime('%X')}") 14 15 task3 = asyncio.create_task(late(3, "Three")) 16 task4 = asyncio.create_task(late(4, "Four")) 17 await(task3) 18 print(f"> {strftime('%X')}") 19 await(task4) 20 print(f"> {strftime('%X')}") 21 22 asyncio.run(main())
asyncio.run(main()) — запуск «приложения» main() в образующем цикле asyncio()
- «приложение» asyncio — корутина, который заполняет очередь mainloop-а и немножко командует им
Если просто написать await — корутина «просто запустится», в чём асинхроннотсь, непонятно (даже если она и выходила в mainloop)
В примере первая корутина спит секунду, а вторая — после этого ещё две
Если написать create_task(корутина), корутина регистрируется в mainloop-е, а возвращется нечто вроде фьючи — задание
await(здадание) запускает его
В примере ещё две корутины планируются одновременно, первая из них спит три секунды, а вторая — четыре, так что отрабатывает через секунду после первой
asyncio.sleep(тайм-аут) — это команда mainlop-у «верни мне управление после тайм-аута»
- Чуть ли не единственная команда mainlop-у на поверхности
Gather — атомарная операция create_task() / await над несколькими корутинами
- Тут всё понятно, запустились, повисели сколько сказано, завершились
(Python 3.11) группы заданий —запуск в виде контекстного менеджера
1 import asyncio 2 3 async def late(delay, msg): 4 await asyncio.sleep(delay) 5 print(msg) 6 return delay 7 8 async def main(): 9 async with asyncio.TaskGroup() as tg: 10 tg.create_task(late(3, "A")) 11 tg.create_task(late(1, "B")) 12 tg.create_task(late(2, "C")) 13 print("Done") 14 15 asyncio.run(main())
Например события
1 async def waiter(name, event): 2 print(f'{name} waits for {event}…') 3 await event.wait() 4 print(f'…{name} got it!') 5 6 async def eventer(wait, event): 7 print(f"Emitting {event} in {wait} seconds") 8 await asyncio.sleep(wait) 9 print(f"Emitting {event}…") 10 event.set() 11 12 async def main(): 13 event = asyncio.Event() 14 await asyncio.gather( 15 waiter("One", event), 16 waiter("Two", event), 17 eventer(1, event)) 18 19 asyncio.run(main())
1 async def ham(queue, size): 2 for i in range(size): 3 await asyncio.sleep(1) 4 res = await queue.get() 5 print(f"\tGot {res}") 6 7 async def spam(wait, queue): 8 for i in range(6): 9 await asyncio.sleep(wait) 10 val = f"{wait}:{i}" 11 await queue.put(val) 12 print(f"Put {val}") 13 14 async def main(): 15 queue = asyncio.Queue() 16 await asyncio.gather( 17 ham(queue, 12), 18 spam(0.4, queue), 19 spam(1.6, queue)) 20 21 asyncio.run(main())
- Есть и приоритетные очереди
И толстый-толстый слой шоколада!
Параллелизм (внешний, следите за тредобезопасностью или не используйте треды)
Изменение логики работы mainloop (aka Policies)
- Сеть (I/O, IPC и всё остальное), сигналы
Потоки (над этим всем)
- Вброс/перехват исключений
- …
Дикая туча модулей на основе asyncio
Д/З
Попробовать прочитать всю документацию и прощёлкать всё, до чего дотянетесь.
EJudge: FilterQueue 'Очередь с фильтром'
Напишите класс FilterQueue со следующими свойствами:
Это потомок asyncio.Queue
В экземпляре класса атрибут очередь.window содержит первый элемент очереди, или None, если очередь пуста
С помощью операции фильтр in очередь можно определить, присутствуют ли в очереди такие элементы, что выражение фильтр(элемент) истинно
Метод .later() синхронно переставляет первый элемент очереди в её конец, или вызывает исключение asyncio.QueueEmpty, если очередь пуста
Метод .get() содержит необязательный параметр фильтр. Вызов очередь.get(фильтр) работает так:
Если в очереди нет элементов, на которых фильтр(элемент) истинно, работает как обычный .get().
Если в очереди есть элементы, на которых фильтр(элемент) истинно, переставляет первый элемент очереди в её конец до тех пор, пока фильтр(элемент) не истинно, а затем выполняет обычный .get().
Разрешается воспользоваться внутренним представлением Queue
1 async def putter(n, queue): 2 for i in range(n): 3 await queue.put(i) 4 5 async def getter(n, queue, filter): 6 for i in range(n): 7 await asyncio.sleep(0.1) 8 yield await queue.get(filter) 9 10 async def main(): 11 queue = FilterQueue(10) 12 asyncio.create_task(putter(20, queue)) 13 async for res in getter(20, queue, lambda n: n % 2): 14 print(res) 15 16 asyncio.run(main())
1 3 5 7 9 11 13 15 17 4 19 12 6 16 8 14 0 10 2 18
EJudge: NotifyEvent 'Оповещения'
Написать класс NotifyEvent (унаследованный от asyncio.Event) со следующими свойствами
В методе оповещение.set(имя) присутствует строка-имя адресата уведомления, но по умолчанию это None
Перед каждым оповещение.set() (кроме самого первого) требуется вызов оповещение.clear()
Метод await оповещение.wait() возвращает имя адресата уведомления (но в остальном работает как event.wait())
Написать также сопрограмму task(имя, оповещение) со следующими свойствами:
Если уведомление «своё» — адресат уведомления совпадает с именем, — выводится имя, количество принятых «своих» уведомлений и количество принятых «чужих» уведомлений
Если вместо имени await оповещение.wait() вернул None, работа завершается
Использовать внутреннюю реализацию asyncio.Event в этой задаче нельзя
1 async def sender(names, notify): 2 for name in names: 3 notify.set(name) 4 await asyncio.sleep(0.1) 5 notify.clear() 6 notify.set() 7 8 async def main(): 9 notify = NotifyEvent() 10 tasks = {n: task(n, notify) for n in "12"} 11 targets = "1", "2", "2", "2", "2", "1", "2", "1", "1" 12 await asyncio.gather(*(list(tasks.values()) + [sender(targets, notify)])) 13 14 asyncio.run(main())
1: 1 / 0 2: 1 / 1 2: 2 / 1 2: 3 / 1 2: 4 / 1 1: 2 / 4 2: 5 / 2 1: 3 / 5 1: 4 / 5
TODO Тесты