Различия между версиями 8 и 9
Версия 8 от 2022-07-17 12:04:28
Размер: 17164
Редактор: FrBrGeorge
Комментарий:
Версия 9 от 2022-11-29 12:58:04
Размер: 17068
Редактор: FrBrGeorge
Комментарий:
Удаления помечены так. Добавления помечены так.
Строка 60: Строка 60:
def sec(Name):
    yield f"SEC-{Name} >"
    yield f"SEC-{Name} <"
Строка 66: Строка 63:
        yield from sec(Name)
Строка 76: Строка 72:
   * Здесь части кода одного `sec()` выполняются в три раза реже частей другого `sec()` (потому что в образующем цикле вероятнось вызова одного 0.75)    * Здесь части кода одного `prim()` выполняются в три раза реже частей другого `prim()` (потому что в образующем цикле вероятность вызова одного 0.75)

Асинхронные возможности Python

TODO: фьючи, похоже, не нужно рассказывать, зато стоит упомянуть примитивы синхронизации

Асинхронность:

  • Явная (параллелизм) — в языке нет
  • Со скрытой активацией (обратные вызовы, callbacks) — всё зависит от mainloop
  • Сопрограммная — вот!

Модель

<!> Предполагается, что весь предлагаемый код вы запускаете и смотрите на результат; без этого понять намного сложнее ☺

  • Как работает yield from

       1 from time import sleep
       2 def sec():
       3     yield f"SEC 1"
       4     yield f"SEC 2"
       5 
       6 def prim(Name="Prim"):
       7     while True:
       8         yield from sec()
       9         yield Name
      10 
      11 core = prim()
      12 for i in range(10):
      13     sleep(1)
      14     print(f"({i:3}) {next(core)}")
    
    • На время yield from код генератора prim() логически не исполняется, можно считать, что на это время его замещает sec()

  • Ловля return из генератора с помощью yield from

       1 from time import sleep
       2 
       3 def adder(x, y):
       4     yield f"add: {x=}, {y=} - 1"
       5     yield f"add: {x=}, {y=} - 2"
       6     yield f"add: {x=}, {y=} - 3"
       7     return x*2+y
       8 
       9 def infadd(n):
      10     for i in range(100000): # вечный цикл
      11         res = yield from adder(i, n)
      12         print(f"add-{i}: {res}")
      13 
      14 core = infadd(3)
      15 for i in range(100):
      16     sleep(1)
      17     res = next(core)
      18     print(f"({i:3}) {res}")
    
    • Здесь x*2+y приезжает в infadd()

  • Асинхронность как произвольное исполнение сегментов кода между yield-ами

    • Понятие образующего цикла (main loop)
    • Передача управления в main loop с помощью явного yield

         1 from random import random
         2 from time import sleep
         3 
         4 def prim(Name="Prim"):
         5     for i in range(100000): # вечный цикл
         6         yield f"{Name}-{i}"
         7 
         8 cores = prim("One"), prim("Two")
         9 for i in range(100):
        10     sleep(1)
        11     res = next(cores[random()>0.75])
        12     print(f"({i:3}) {res}")
      
      • Здесь части кода одного prim() выполняются в три раза реже частей другого prim() (потому что в образующем цикле вероятность вызова одного 0.75)

    • Обмен данными с сегментами сопрограмм с помощью .send()

         1 def adder(n):
         2     x = yield n + 1
         3     y = yield x + 1
         4     return x * 2 + y
         5 
         6 def add4():
         7     res1 = yield from adder(1)
         8     res2 = yield from adder(2)
         9     return res1 + res2
        10 
        11 cmd, data = add4(), None
        12 for i in range(4):
        13     res = cmd.send(data)
        14     data = eval(input(f"{i+1}: "))
        15 
        16 cmd.send(10000)
      
      • Здесь adder() запрашивает в образующем цикле значения, а add4() комбинирует результаты работы двух adder()-ов

      • Обратите внимание на то, каким образом в действительности передаётся возвращаемое значение генератора: как поле value исключения StopIteration (если вы не увидели этого, значит, вы не запускали код… ну, как хотите ☹)

  • Мы можем усложнить логику управления образующим циклом. В примере ниже видя int mainloop засылает в генератор небольшое случайное целое, а видя str — случайную букву

       1 from time import sleep
       2 from string import ascii_uppercase as alpha
       3 import random
       4 
       5 def gen():
       6     x = yield int
       7     y = yield str
       8     return x*y
       9 
      10 def rep():
      11     res1 = yield from gen()
      12     res2 = yield from gen()
      13     return res1 + res2
      14 
      15 def runner(cmd):
      16     req = next(cmd)
      17     try:
      18         while True:
      19             if req is str:
      20                 req = cmd.send(random.choice(alpha))
      21             elif req is int:
      22                 req = cmd.send(random.randint(1,9))
      23             else:
      24                 raise ValueError(req)
      25     except StopIteration as E:
      26         cmd.close()
      27         return E.value
      28 
      29 print(runner(rep()))
    
    • Возвращаемое значение перехватывается в обработчике исключения StopIteration

Пример, иллюстрирующий понятие асинхронности

Модифицируем предыдущий пример:

  • rep(n) будет комбинировать результат n вызовов gen()

  • runner() будет

    • получать на вход несколько разных rep()

    • формировать из них кольцевую очередь, в которой рядом со ссылкой на генератор будет храниться «запрос» — что передавать rep() в следующий раз, int или str

    • обрабатывать эту очередь по кругу
    • перехватывать StopIteration, забирать возвращённое значение, и исключать соответствующий rep() из очереди

    • По исчерпании очереди — возвращать список результатов
       1 from time import sleep
       2 from string import ascii_uppercase as alpha
       3 import random
       4 
       5 def gen():
       6     return (yield int) * (yield str)
       7 
       8 def rep(n):
       9     res = ""
      10     for i in range(n):
      11         res += yield from gen()
      12     return res
      13 
      14 
      15 def runner(*cmds):
      16     queue = [(cmd, None) for cmd in reversed(cmds)]
      17     res = []
      18     while queue:
      19         cmd, req = queue.pop()
      20         try:
      21             if req is str:
      22                 req = cmd.send(random.choice(alpha))
      23             elif req is int:
      24                 req = cmd.send(random.randint(1,6))
      25             elif req is None:
      26                 req = next(cmd)
      27             else:
      28                 raise ValueError(req)
      29         except StopIteration as E:
      30             res.append(E.value)
      31             cmd.close()
      32         else:
      33             queue.insert(0, (cmd, req))
      34     return res
      35 
      36 print(runner(rep(10), rep(3), rep(5)))
    
    • Первым закончился rep(3), потому что его выполнение включало 6 yield-ов (6 фрагментов до yield + один концевой до return), затем — rep(5), и затем — rep(10) (хотя его первый фрагмент был выполнен первым).

Ещё модели

  • Цикл событий: получает откуда-то «события», после чего некоторые сегменты кода (которые ждут этого события) можно «разбудить»

    • Цикл обратных вызовов: примерно то же самое
  • Цикл с future:

    • future — это асинхронный код из двух сегментов: результат + поле готовности

    • Один код спит на ней (выходит в mainloop)
    • Другой код заполняет результат и выставляет готовность
    • mainloop проверяет готовность, и если она есть — передаёт управление
  • … более сложная логика

Синтаксис Async

  • async def ~== генератор

  • await ~== yield from

  • @types.coroutine — чтобы и yield и return (не доделали ещё?) TODO проверить, нужно ли

    • async def + yield — совсем другое, это именно то, чем кажется, специальные асинхронные генераторы, которые можно проходить async for (причём в конструкторах вида [… async for i in асинхронный-гененратор …] тоже)

Перепишем предыдущий пример на async (дополнительно будем использовать deque вместо списка, потому что это правильно для реализации очереди):

  •    1 from time import sleep
       2 from string import ascii_uppercase as alpha
       3 import random
       4 import types
       5 from collections import deque
       6 
       7 @types.coroutine
       8 def gen():
       9     x = yield int
      10     y = yield str
      11     return x*y
      12 
      13 async def rep(n):
      14     res = ""
      15     for i in range(n):
      16         res += await gen()
      17     return res
      18 
      19 def runner(*cmds):
      20     queue = deque((cmd, None) for cmd in reversed(cmds))
      21     res = []
      22     while queue:
      23         cmd, req = queue.pop()
      24         try:
      25             if req is str:
      26                 req = cmd.send(random.choice(alpha))
      27             elif req is int:
      28                 req = cmd.send(random.randint(1,6))
      29             elif req is None:
      30                 req = cmd.send(None)
      31             else:
      32                 raise ValueError(req)
      33         except StopIteration as E:
      34             res.append(E.value)
      35             cmd.close()
      36         else:
      37             queue.appendleft((cmd, req))
      38     return res
      39 
      40 print(runner(rep(10), rep(3), rep(5)))
    
    • наш gen() — не совсем стандартная корутина, потому что использует прямое управление образующим циклом с помощью yield

    • В готовом инструментарии это практически никогда не нужно: и образующий цикл, и инструменты управления им должны входить в такой инструментарий

Asyncio

Базоавя документация

  • Самое сложное — это логика образующего цикла
  • Самое ненужное — это логика образующего цикла (достаточно знать, как он работает, а не что делает)

  • Запрограммируем образующий цикл заранее, насуём туда инструментов
  • Упростим протокол управления до одного понятия — Future

  • (asyncio specific) обмажем огромным количеством применений IRL

Основные понятия:

  • Mainloop — полностью под капотом, мы его не видим

  • Task:

       1 import asyncio
       2 from time import strftime
       3 
       4 async def late(delay, msg):
       5     await asyncio.sleep(delay)
       6     print(msg)
       7 
       8 async def main():
       9     print(f"> {strftime('%X')}")
      10     await late(1, "One")
      11     print(f"> {strftime('%X')}")
      12     await late(2, "Two")
      13     print(f"> {strftime('%X')}")
      14     task3 = asyncio.create_task(late(3, "Three"))
      15     task4 = asyncio.create_task(late(4, "Four"))
      16     await(task3)
      17     print(f"> {strftime('%X')}")
      18     await(task4)
      19     print(f"> {strftime('%X')}")
      20 
      21 asyncio.run(main())
    
    • asyncio.run(main()) — запуск «приложения» main() в образующем цикле asyncio()

    • «приложение» asyncio — корутина, который заполняет очередь mainloop-а и немножко командует им
    • Если просто написать await — корутина «просто запустится», в чём асинхроннотсь, непонятно (даже если она и выходила в mainloop)

    • В примере первая корутина спит секунду, а вторая — две
    • Если написать create_task(корутина), то её выполнение тут же стартует в mainloop-е

    • В примере ещё две корутины планируются одновременно, первая из них спит три секунды, а вторая — четыре, так что отрабатывает через секунду после первой
    • asyncio.sleep(тайм-аут) — это команда mainlop-у «верни мне управление после тайм-аута»

  • Gather — атомарная операция create_task() / await над несколькими корутинами

       1 import asyncio
       2 
       3 async def late(delay, msg):
       4     await asyncio.sleep(delay)
       5     print(msg)
       6     return delay
       7 
       8 async def main():
       9     res = await asyncio.gather(
      10             late(3, "A"),
      11             late(1, "B"),
      12             late(2, "C"),
      13     )
      14     print(res)
      15 
      16 asyncio.run(main())
    
  • Протокол работы Future

    1. У Future есть творец и адресат, оба имеют к нему доступ.

    2. Адресат выполняет результат = await фьюча

    3. Неготовая фьюча уходит в mainloop
    4. Творец, когда ему заблагорассудится, с помощью фьюча.set_result(что-то) объявляет mainloop-у, что (1) будущее наступило (готовность становится истиной), и (2) теперь оно равно что-то (заполняется результат)

    5. Mainloop, видя, что фьюча уже готова, возвращает ей управление
    6. Фьюча делает return результат

    7. Адресату этот результат приезжает из await

       1 import asyncio
       2 
       3 async def delayed(fut, sec, ret):
       4     await asyncio.sleep(sec)
       5     fut.set_result(ret)
       6 
       7 async def now(fut, ret):
       8     fut.set_result(ret)
       9 
      10 async def main():
      11     fut1, fut2 = asyncio.Future(), asyncio.Future()
      12     asyncio.create_task(delayed(fut2, 1, 'Done'))
      13     asyncio.create_task(now(fut1, 'Start'))
      14     print(await fut1)
      15     print(await fut2)
      16 
      17 asyncio.run(main())
    
  • В действительности,
    • фьюча — это просто генератор на два шага:
      1. будущее не наступило / yield 

      2. будущее наступило / return

    • …в котором есть два дополнительных поля (объявление о том, что оно наступило и передаваемое значение)
    • В этом примере одна из корутин обрабатывает две фьючи в разное время

         1 import asyncio
         2 
         3 async def delayed(fut, sec, ret):
         4     await asyncio.sleep(sec)
         5     print(ret)
         6     fut.set_result(ret)
         7 
         8 async def now_then(fut, sec, futb, ret):
         9     print(ret)
        10     fut.set_result(ret)
        11     await asyncio.sleep(sec)
        12     print(ret)
        13     futb.set_result(ret)
        14 
        15 async def main():
        16     fut1, fut2, fut3 = (asyncio.Future() for i in range(3))
        17     asyncio.create_task(delayed(fut1, 1, 'Middle'))
        18     asyncio.create_task(now_then(fut2, 2, fut3, 'Border'))
        19     print(await asyncio.gather(fut1, fut2, fut3))
        20 
        21 asyncio.run(main())
      

И толстый-толстый слой шоколада!

  • Параллелизм (внешний, следите за тредобезопасностью или не используйте треды)
  • Изменение логики работы mainloop (aka Policies)
  • Очереди (всякие)
  • Сеть (I/O, IPC и всё остальное), сигналы
  • Потоки (над этим всем)
  • Модификация образующего цикла
  • Вброс/перехват исключений
  • Дикая туча модулей на основе asyncio

Д/З

  1. Попробовать прочитать всю документацию и прощёлкать всё, до чего дотянетесь.

  2. Задач на EJudge нет, но прошлое задание есть и его надо доделать!

LecturesCMC/PythonIntro2021/13_Async (последним исправлял пользователь FrBrGeorge 2022-11-29 12:58:04)