Асинхронные возможности Python
Асинхронность:
Прямая (параллелизм) — в языке нет
Событийно-ориентированное программирование, в котором программа — это обработчик независимо порождаемых событий
Как правило, понятие «события» привязано к внешним источникам, а программа представляет собой более или менее обычный цикл обработки, то есть цикл и вызов методов-обработчиков
См., например, tkinter
- Сопрограммная: внутри нескольких сопрограмм есть синхронные участки; асинхронность — это алгоритм, сообразно которому происходит переключение между выполнением этих участков
- Да это ж генераторы!
Модель
Предполагается, что весь предлагаемый код вы запускаете и смотрите на результат; без этого понять намного сложнее, if even possible ☺
Как работает yield from (повторение)
На время yield from код генератора task() логически не исполняется, можно считать, что на это время его замещает subr()
Ловля return из генератора с помощью yield from (два способа)
Оператор return в генераторе откладывает свой параметр в поле .value исключения StopIteration
А в конструкции yield from … это значение приезжает прямо так!
Передача параметра в генератор с помощью .send() (повторение):
Особенность: самый первый .send() должен быть генератор.send(None) (или, что то же самое, next(генератор), потому что в синтаксисе нет способа передать какое-то значение в начало генератора, а не в yield.
initial — это параметр генератор-функции, он передаётся в момент создания генератора, а не при его проходе
Мы договорились считать этот первый next() запуском генератора.
Куда происходит .send() в случае yield from?
Ничего неожиданного: .send() попадает в тот итератор, который сейчас yield-ит
Внимательно посмотрим, куда что send-илось…
Асинхронность как произвольное исполнение частей кода между yield-ами
Понятие синхронного фрагмента — непрерывно выполняемого кода между yield-ами (а также стартом и return-ом)
Понятие образующего цикла (main loop)
Тот же пример, но с двумя асинхронно выполняющимися задачами:
1 def subr(n): 2 x = yield f"({n}) Wait for x" 3 y = yield f"({n}) Wait for y ({x=})" 4 return x, y 5 6 def task(n): 7 while True: 8 value = yield from subr(n) 9 _ = yield f"[{n}]: {value}" 10 11 cores = task(0), task(1) 12 print(next(cores[0]), next(cores[1]), sep="\n") 13 for i in range(20): 14 print(cores[not i % 3].send(i))
Здесь из образующего цикла поступает поток целых чисел, subr() их попарно умножает, а две задачи складывают эти произведения
Очередное число попадает в subr() выбранной задачи, а выбор задач делает образующий цикл
Синхронные фрагменты из task[0] выполняются в два раза чаще синхронных фрагментов из task[1]
Можно попробовать разобраться, какой .send() докуда доходил
- Более сложный пример: три конечных задачи с разным количеством синхронных фрагментов
1 def subr(): 2 x = yield 3 y = yield 4 return [x, y] 5 6 def task(num): 7 res = [] 8 for i in range(num): 9 res += yield from subr() 10 return res 11 12 def loop(*tasks): 13 queue, result = list(tasks), [] 14 print("Start:", *queue, sep="\n\t") 15 for task in tasks: 16 next(task) 17 step = 0 18 while queue: 19 task = queue.pop(0) 20 try: 21 task.send(step) 22 except StopIteration as ret: 23 result.append((hex(id(task)), ret.value)) 24 else: 25 queue.append(task) 26 step += 1 27 return result 28 29 print("Done:", *loop(task(7), task(2), task(5)), sep="\n\t")
- Образующий цикл вынесен в отдельную функцию и стал сложнее. В нём генерируется последовательность целых чисел и отдаётся поштучно на обработку очередному заданию. Если задание закончилось, запоминается его результат, а если нет — ставится в конец очереди.
Для реализации этой логики пришлось снова «вытащить» явную обработку StopIteration
Значения, возвращаемые yield, при этом не используются вообще: yield служит только для разметки синхронных фрагментов
Если ещё усложнить логику образующего цикла, мы сможем управлять его поведением с помощью возвращаемых yield значений:
1 from random import randint 2 from string import ascii_uppercase 3 from collections import deque 4 5 def subr(): 6 return (yield int) * (yield str) 7 8 def task(num): 9 res = "" 10 for i in range(num): 11 res += yield from subr() 12 return res 13 14 def loop(*tasks): 15 queue, result = deque((task, None) for task in tasks), [] 16 print("Start:", *queue, sep="\n\t") 17 idx = -1 18 while queue: 19 task, request = queue.popleft() 20 if request is int: 21 data = randint(1, 4) 22 elif request is str: 23 data = ascii_uppercase[idx := idx + 1] 24 else: 25 data = request 26 try: 27 request = task.send(data) 28 except StopIteration as ret: 29 result.append((task, ret.value)) 30 task.close() 31 else: 32 queue.append((task, request)) 33 return result 34 35 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
subr() возвращает тип параметра, который она хотела бы получить в следующем yield-е
Этот тип хранится в очереди вместе с заданием, чей subr() запросил данный параметр
- Образующий цикл генерирует параметр сообразно типу
А ещё мы храним очередь в очереди, а не в списке, не надо привыкать к плохому!
Можно и дальше усложнять, но и так уже непросто!
Ещё модели
Цикл событий: образующий цикл получает откуда-то «события», определяет, кто их должен обрабатывать и вызывает функции-обработчики с параметром обработчик(событие) (возможно, не функции, а генераторы обработчик.send(событие), не слишком важно).
- Цикл обратных вызовов (callback-ов): частный случай того же самого: каждый обработчик «регистрируется» — по заранее определённому протоколу указывает, в каких случаях его надо вызывать (это и есть событие), а образующий цикл при наступлении события вызывает все обработчики, которые на нём зарегистрировались (опять-таки, можно организовать в виде функций, а можно в виде генераторов)
Цикл с фьючами (future, promise): унификация управления образующим циклом
future — это генератор, в котором есть поле «готовность / результат»; изначально фьюча не готова. Фьюча состоит из двух синхронных сегментов:
Настройка и yield себя в образующий цикл
return готового результата пользователю
- Алгоритм работы:
- Неготовая фьюча заводится в данном образующем цикле
Образующий цикл вызывает next(сопрограмма)
Сопрограмма делает yield from фьюча
- Фьюча выпадает в образующий цикл, потому что она ещё не готова (первый сегмент)
- Образующий цикл продолжает работу, проверяя, что фьюча не готова
- В какой-то момент некто выставляет фьюче готовность / результат
На этом основании образующий цикл возвращает управление фьюче next(фьюча) (во второй сегмент)
- А та возвращает значение пользователю
- Если фьюча всё ещё не готова — это ошибка алгоритма в образующем цикле, так делать нельзя
Если фьюча уже готова, все вызовы yield from фьюча сразу возвращают результат
Полученный StopIteration образующий цикл обрабатывает (обычно игнорирует)
- … более сложная логика (например, приоритизация событий) …
Ещё раз: асинхронность — это не параллелизм! Все фрагменты выполняются последовательно в один поток.
Синтаксис Async
async def + return — задание сопрограммы
Генератор на один шаг:
Может включать в себя await ≈ yield from — вызов других сопрограмм
Кстати, async def + yield — это именно то, чем кажется: генераторы, про которые сразу известно, что они асинхронные:
Их можно проходить циклом async for (причём в конструкторах вида [… async for i in асинхронный-гененратор …] тоже)
Если до этого момента не стало понятно:
A tale of event loops — ещё одно объяснение async, немножко с обратного конца
Перепишем предыдущий пример на async
Примечание: @types.coroutine — низкоуровневая сопрограмма, которая может делать и return значение, и yield, то есть напрямую обращаться к образующему циклу. Встречается редко.
1 from random import randint
2 from string import ascii_uppercase
3 from types import coroutine
4 from collections import deque
5
6 @coroutine
7 def subr():
8 return (yield int) * (yield str)
9
10 async def task(num):
11 res = ""
12 for i in range(num):
13 res += await subr()
14 return res
15
16 def loop(*tasks):
17 queue, result = deque((task, None) for task in tasks), []
18 print("Start:", *queue, sep="\n\t")
19 idx = -1
20 while queue:
21 task, request = queue.popleft()
22 if request is int:
23 data = randint(1, 4)
24 elif request is str:
25 data = ascii_uppercase[idx := idx + 1]
26 else:
27 data = request
28 try:
29 request = task.send(data)
30 except StopIteration as ret:
31 result.append((task, ret.value))
32 task.close()
33 else:
34 queue.append((task, request))
35 return result
36
37 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
Наш subr() использует прямое управление образующим циклом с помощью yield
В готовом инструментарии это практически никогда не нужно: и образующий цикл, и инструменты управления им должны входить в такой инструментарий
Формально говоря, awaitable object — это просто объект с методом .__await__(), возвращающим итератор. Однако логика работы этого метода диктуется управляющим циклом, это вам не .__call__ ☺. Вот пример реализации логики Future для управляющего цикла asyncio.
Asyncio
Немного истории:
- … пошло-поехало!
- …
Python Asyncio: The Complete Guide (NB: название сайта!)
- …
- Самое сложное — это логика образующего цикла
Самое ненужное — это логика образующего цикла (достаточно знать, как он работает, а не что делает)
⇒
- Запрограммируем образующий цикл заранее, насуём туда инструментов
Упростим протокол управления до одного понятия — Future
- Обмажем протокол верхним уровнем (задания, события, очереди и т. п.)
До такой степени, что ни одна из наших сопрограмм не делает yield (если это не асинхронный генератор)
(asyncio specific) обмажем огромным количеством применений IRL
Основные понятия:
Mainloop — образующий цикл. Полностью под капотом, мы его не видим.
Task — сопрограмма, поставленная в управление образующим циклом
1 import asyncio 2 from time import strftime 3 4 async def late(delay, msg): 5 await asyncio.sleep(delay) 6 print(msg) 7 8 async def main(): 9 print(f"> {strftime('%X')}") 10 await late(1, "One") 11 print(f"> {strftime('%X')} + 1") 12 await late(2, "Two") 13 print(f"> {strftime('%X')} + 2") 14 15 task3 = asyncio.create_task(late(3, "Three")) 16 task4 = asyncio.create_task(late(4, "Four")) 17 await task3 18 print(f"> {strftime('%X')} + 3") 19 await task4 20 print(f"> {strftime('%X')} + <<1>>") 21 22 asyncio.run(main())
asyncio.run(main()) — запуск «приложения» main() в образующем цикле asyncio()
- «приложение» asyncio — корутина, который заполняет очередь mainloop-а и немножко командует им
Если просто написать await — корутина «просто запустится», в чём асинхроннотсь, непонятно (даже если она и выходила в mainloop)
В примере первая корутина спит секунду, а вторая — после этого ещё две
Если написать create_task(корутина), корутина регистрируется в mainloop-е, а возвращется нечто вроде фьючи — задание
await(здадание) запускает его
В примере ещё две корутины планируются одновременно, первая из них спит три секунды, а вторая — четыре, так что отрабатывает через секунду после первой
asyncio.sleep(тайм-аут) — это команда mainloop-у «верни мне управление после тайм-аута»
- Чуть ли не единственная команда asyncio-шному mainloop-у на поверхности
Gather — атомарная операция create_task() / await над несколькими корутинами
- Тут всё понятно, запустились, повисели сколько сказано, завершились
(Python 3.11) группы заданий — запуск в виде контекстного менеджера
1 import asyncio 2 3 async def late(delay, msg): 4 await asyncio.sleep(delay) 5 print(msg) 6 return delay 7 8 async def main(): 9 async with asyncio.TaskGroup() as tg: 10 tg.create_task(late(3, "A")) 11 tg.create_task(late(1, "B")) 12 tg.create_task(late(2, "C")) 13 print("Done") 14 15 asyncio.run(main())
Asyncio — это:
Асинхронное выполнение фрагментов кода между yield-ами; порядок определяется образующим циклом и намеренно недетерминирован
⇒ возможны ситуации гонок, взаимоблокировки и прочие прелести; необходима синхронизация
- Один поток вычислений
⇒ нет ситуаций одновременного атомарного доступа к ресурсу (неатомарный одновременный доступ, то есть длящийся более одного фрагмента, разумеется, есть; требуются семафоры и mutex-ы)
⇒ нельзя надолго (особенно — неопределённо надолго) оставаться в одном фрагменте (висеть в горячем цикле, синхронном вводе и т. п.): пока сопрограмма не передала управление образующему циклу, никакие другие задания не выполняются
В таких случаях может помочь asyncio.sleep(0), но если он помогает — что-то с вашим алгоритмом не то…
- Простая модель асинхронности с полностью скрытым образующим циклом
Высокоуровневое API (введение)
Собственно фьюча,
она считается низкоуровневым примитивом (The rule of thumb is to never expose Future objects in user-facing APIs)
(если успеем) Пример:
1 >>> F = asyncio.Future() 2 >>> F.done() 3 False 4 >>> F 5 <Future pending> 6 >>> f = F.__await__() # Итератор из двух сегментов 7 >>> f.send(None) # Это клиент сделал await (бывш. yield from f) 8 <Future pending> # вернулась неготовая фьюча 9 >>> F.set_result(42) # кто-то сделал её готовой 10 >>> F 11 <Future finished result=42> 12 >>> F.done() 13 True 14 15 >>> f.send(None) # mainloop велел продолжать 16 Traceback (most recent call last): 17 File "<stdin>", line 1, in <module> 18 StopIteration: 42
Высокоуровневый — например, события
1 async def waiter(name, event): 2 print(f'{name} waits for {event}…') 3 await event.wait() 4 print(f'…{name} got it!') 5 6 async def eventer(wait, event): 7 print(f"Emitting {event} in {wait} seconds") 8 await asyncio.sleep(wait) 9 print(f"Emitting {event}…") 10 event.set() 11 12 async def main(): 13 event = asyncio.Event() 14 await asyncio.gather( 15 waiter("One", event), 16 waiter("Two", event), 17 eventer(1, event)) 18 19 asyncio.run(main())
…или барьеры
- …
1 async def ham(queue, size): 2 for i in range(size): 3 await asyncio.sleep(1) 4 res = await queue.get() 5 print(f"\tGot {res}") 6 7 async def spam(wait, queue): 8 for i in range(6): 9 await asyncio.sleep(wait) 10 val = f"{wait}:{i}" 11 await queue.put(val) 12 print(f"Put {val}") 13 14 async def main(): 15 queue = asyncio.Queue() 16 await asyncio.gather( 17 ham(queue, 12), 18 spam(0.4, queue), 19 spam(1.6, queue)) 20 21 asyncio.run(main())
- Есть и приоритетные очереди
И толстый-толстый слой шоколада!
Параллелизм (внешний, следите за тредобезопасностью или не используйте треды)
Изменение логики работы mainloop (aka Policies)
- Сеть (I/O, IPC и всё остальное), сигналы
Потоки (над этим всем)
В частности, TCP сервер (аналог netcat -l)
1 import asyncio 2 3 async def echo(reader, writer): 4 while data := await reader.readline(): 5 writer.write(data.swapcase()) 6 writer.close() 7 await writer.wait_closed() 8 9 async def main(): 10 server = await asyncio.start_server(echo, '0.0.0.0', 1337) 11 async with server: 12 await server.serve_forever() 13 14 asyncio.run(main())
- Может принимать произвольное количество соединений
Ещё раз: это не параллелизм!
- ⇒ (да, да, помним: не висеть внутри сопрограммы в горячих циклах / синхронном вводе / whatever alike)
datamodel.html#coroutine.throw (по аналогии с generator.throw()
- …
Дикая туча модулей на основе asyncio
Д/З
Попробовать прочитать всю документацию и прощёлкать всё, до чего дотянетесь.
EJudge: SyncSeq 'Синхронизация последовательности'
Напишите класс Seq(name), экземпляры которого обладают свойством синхронизации в порядке их создания. Единственный параметр — name — это уникальный идентификатор экземпляра. Класс должен предоставлять корутину .run(), которая в нужный момент выводит name на стандартный вывод; возвращает она тоже name.
1 import asyncio 2 import random 3 4 # … 5 6 async def main(*names): 7 random.seed(1337) 8 random.shuffle(seq := [Seq(name) for name in names]) 9 shnames = [s.name for s in seq] 10 print(*shnames) 11 result = await asyncio.gather(*(s.run() for s in seq)) 12 print(*result) 13 14 asyncio.run(main(1, 2, 3, 4, 5, 6, 7))
1 6 4 2 3 7 5 1 2 3 4 5 6 7 1 6 4 2 3 7 5
EJudge: FilterQueue 'Вас здесь не стояло!'
Напишите класс FilterQueue со следующими свойствами:
Это потомок asyncio.Queue
В экземпляре класса атрибут очередь.window содержит первый элемент очереди, или None, если очередь пуста (просмотр очередь.window не влияет на состояние очереди)
С помощью операции фильтр in очередь можно определить, присутствуют ли в очереди такие элементы, что выражение фильтр(элемент) истинно
Метод .later() синхронно переставляет первый элемент очереди в её конец, или вызывает исключение asyncio.QueueEmpty, если очередь пуста
Метод .get() содержит необязательный параметр фильтр. Вызов очередь.get(фильтр) работает так:
Если в очереди нет элементов, на которых фильтр(элемент) истинно, работает как обычный .get().
Если в очереди есть элементы, на которых фильтр(элемент) истинно, переставляет первый элемент очереди в её конец до тех пор, пока фильтр(элемент) не истинно, а затем выполняет обычный .get().
Разрешается воспользоваться внутренним представлением Queue; код Queue можно посмотреть тут
1 async def putter(n, queue): 2 for i in range(n): 3 await queue.put(i) 4 5 async def getter(n, queue, filter): 6 for i in range(n): 7 await asyncio.sleep(0.03) 8 yield await queue.get(filter) 9 10 async def main(): 11 queue = FilterQueue(10) 12 asyncio.create_task(putter(20, queue)) 13 async for res in getter(20, queue, lambda n: n % 2): 14 print(res) 15 16 asyncio.run(main())
1 3 5 7 9 11 13 15 17 4 19 12 6 16 8 14 0 10 2 18
EJudge: OneDArcade 'Одноменрая аркада'
Написать класс Monster(Имя, Позиция, Задержка, Сила), определяющий поведение монстра в одномерной аркаде.
Имя — строка, остальные параметры — натуральные числа.
Класс имеет также метод-корутину .run(начало_эпизода, конец_эпизода).
начало_эпизода и конец_эпизода — это asyncio барьеры, которые предполагается использовать в программе
При задержке == N корутина прибавляет 1 к Позиции монстра каждое N-е событие начало_эпизода, если этот монстр жив
Написать также корутину game(монстры, начало_эпизода, конец_эпизода, эпоха), которая после каждого события конец_эпизода:
Просматривает массив монстров монстры
Находит в нём самую левую пару живых монстров, чья Позиция совпадает
- Если таких больше двух, берутся первые два в порядке перечисления в массиве
Заставляет эту пару сражаться: если их Сила равна, умирают оба, если нет — сила выжившего уменьшается на значение силы умершего
Работа game() заканчивается:
Если все монстры умерли, тогда она возвращает "All dead"
Если все монстры выжили после эпоха эпизодов, возвращает "All flee"
- В противном случае возвращается список выживших монстров (через пробел с запятой)
Корутина main() в тестах не меняется
1 async def main(*specs): 2 monsters = [Monster(*spec) for spec in specs] 3 animate, freeze = asyncio.Barrier(len(monsters) + 1), asyncio.Barrier(len(monsters) + 1) 4 squad = [asyncio.create_task(m.run(animate, freeze)) for m in monsters] 5 result = await game(monsters, animate, freeze, 10000) 6 _ = [m.cancel() for m in squad] 7 return result 8 9 print(asyncio.run(main(("Kano", 1, 1, 20), ("Sonya", 2, 2, 15), ("Liu Kang", 4, 3, 10)))) 10 print(asyncio.run(main(("Sonya", 2, 2, 15), ("Johnny Cage", 1, 3, 1)))) 11 print(asyncio.run(main(("Kano", 1, 1, 20), ("Sonya", 2, 2, 15), ("Liu Kang", 4, 3, 10), ("Kabal", 5, 1, 5)))) 12 print(asyncio.run(main(("Milena", 1, 1, 15), ("Kitana", 2, 2, 15))))
Liu Kang All flee Liu Kang, Kabal All dead