Python 3.6: асинхронное всё

Отступление 1: Сопрограммы (coroutines)

(PEP 342)

классические генераторы это не совсем оно:

generators cannot yield control while other functions are executing, unless those functions are themselves expressed as generators

необходимо мочь засовывать значения внутрь генератора, тогда все будет хорошо

   1 >>> def flipflop(initial=0):
   2 ...     x = initial
   3 ...     while True:
   4 ...             x = yield x
   5 ...
   6 >>> f = flipflop()
   7 >>> f.send(2)
   8 Traceback (most recent call last):
   9 File "<stdin>", line 1, in <module>
  10 TypeError: can't send non-None value to a just-started generator
  11 >>> f.send(None)
  12 0
  13 >>> f.send(1)
  14 1
  15 >>> f.send(4)
  16 4
  17 >>> f.send(None)
  18 >>> f.send(123)
  19 123
  20 >>> f.close()

еще можно закидывать исключения в итератор: it.exception()

в результате получаем полноценную сопрограмму: можем передавать и управление, и значения в обе стороны.

отступление 2: `yield from`

(PEP 380)

Если разбивать генератор на мелкие кусочки, нельзя просто так взять и засунуть yield на более глубокий уровень

Последовательность значений из итератора (в одну сторону) можно вернуть как-то так:

   1 for v in g:
   2     yield v

но (см. выше) yield же может принимать значения снаружи. всё становится сложно.

поэтому придумали yield from <expr>

как это работает:

когда выполнение натыкается на yield from subg(), все приходящие через send()/next() запросы проксируются в subg, а yield'имые значения соответственно передаются сразу наружу. Так происходит до тех пор, пока subg не кончится (т.е. не выбросит StopIteration), после этого выполнение внешнего генератора продолжается.

пример (вкуривать пока не станет понятно):

   1 def g1(a):
   2     print('  g1 enter')
   3     for n in range(a):
   4         print('  g1 yielding(n=%d)'%(n))
   5         batman = yield "batman",n
   6         print('  g1 recv batman=%d' % (batman))
   7         print('  g1 yielding from g2(%d)' % (n))
   8         yield from g2(n)
   9         print('  g1: yield from g2 returns control')
  10 
  11 def g2(n):
  12     print('    g2 enter')
  13     q=0
  14     for i in range(n):
  15         print('    g2 yielding(i=%d)' % (i))
  16         q=yield(i+q*10)
  17         print('    g2 recv',q)
  18 
  19 
  20 
  21 val=0
  22 print('top creates g')
  23 g = g1(4)
  24 print('top sending None')
  25 print('top got',g.send(None))
  26 while True:
  27     try:
  28         print('top sending ',val)
  29         result = g.send(val)
  30         print('top got', result)
  31         val += 10
  32     except StopIteration:
  33         print('StopIteration')
  34         break

top creates g
top sending None
  g1 enter
  g1 yielding(n=0)
top got ('batman', 0)
top sending  0
  g1 recv batman=0
  g1 yielding from g2(0)
    g2 enter
  g1: yield from g2 returns control
  g1 yielding(n=1)
top got ('batman', 1)
top sending  10
  g1 recv batman=10
  g1 yielding from g2(1)
    g2 enter
    g2 yielding(i=0)
top got 0
top sending  20
    g2 recv 20
  g1: yield from g2 returns control
  g1 yielding(n=2)
top got ('batman', 2)
top sending  30
  g1 recv batman=30
  g1 yielding from g2(2)
    g2 enter
    g2 yielding(i=0)
top got 0
top sending  40
    g2 recv 40
    g2 yielding(i=1)
top got 401
top sending  50
    g2 recv 50
  g1: yield from g2 returns control
  g1 yielding(n=3)
top got ('batman', 3)
top sending  60
  g1 recv batman=60
  g1 yielding from g2(3)
    g2 enter
    g2 yielding(i=0)
top got 0
top sending  70
    g2 recv 70
    g2 yielding(i=1)
top got 701
top sending  80
    g2 recv 80
    g2 yielding(i=2)
top got 802
top sending  90
    g2 recv 90
  g1: yield from g2 returns control
StopIteration

asyncio

(PEP 3156)

с чего всё началось: транспорт и протокол

протокол осуществляет доставку байтов, транчпорт предоставляет/получает эти байты; они дергают nonblocking методы друг друга

event loop

Такой объект со специфицированным интерфейсом, который выполняет функции планировщика. Собственно, в него передаётся управление, когда асинхронная сопрограмма залипает на блокирующей операции.

основные методы:

запуск и остановка:

Сохраняет порядок выполнения.

<еще куча всякого тредсейфового и специфичного(сокеты всякие там), см. доки>

event loop либо глобален (и добывается через event loop policy, такой специальный объект), либо явным образом передается во все функции, которые с ним работают (грепать в пепе: event loop policy; Passing an Event Loop Around Explicitly)

в некоторых случаях эвентлуп предоставляется системой и уже запущен, и его может быть нельзя остановить или закрыть внутри программы.

asyncio предоставляет две реализации: SelectorEventLoop и ProactorEventLoop, последний работает только на винде и около.

Хэндлы нужны, чтоб отменить задание через cancel(). Других применений и публичных методов у них нет.

отступление 3: Futures

(PEP 3148)

изначально идея зародилась в concurrent (многотредовость), а потом идею зохавал asyncio. Собственно, в concurrent это выглядит следующим образом:

Futures в asyncio работают примерно так же.

Основное:

есть волшебные методы:

asyncio.async(arg): делает Future из аргумента (в качестве аргумента допустимо всё что можно писать справа от yield from, т.е. сопрограмма либо future)

asyncio.wrap_future(future): обертка-адаптер для concurrent.futures.Future, чтоб оно работало с asyncio

<протоколы протоколы транспорты транспорты. курите сами>

Это всё было про коллбэки.

Теперь интересное: можно коллбэками не пользоваться, а пользоваться сопрограммами.

Things a coroutine can do:

pep 492: async/await syntax

PEP 0492

понятие native coroutine (т.е. не из генератора)

определяется так:

   1 async def read_data(db):
   2     pass

await

ждем ответа от чего-либо, тем временем передаём управление в event loop

   1 async def read_data(db):
   2     ...
   3     data = await db.fetch('SELECT ...')
   4     ...

с точки зрения данной сопрограммы это просто блочит выполнение, пока awaitable не завершится

т.е. db.fetch() возвращает специальный awaitable объект, от которого далее можно ждать ответа

в случае asyncio awaitable это Future собственно, awaitable бывают трех видов:

когда мы говорим await foo, мы отдаем управление эвентлупу до тех пор пока foo не закончит (+эпсилон)

async with

в случае с обычным with контекст глобальный. Когда мы можем передавать управление в сильно другие места это приводит к волшебным багам, поэтому нужен специальный asynchronous context manager, который умеет обрабатывать такие переключения (Two new magic methods are added: __aenter__ and __aexit__).

async for

работает только внутри нативной сопрограммы (async def) нужен специальный asynchronous iterable __aiter__ -> __anext__ -> StopAsyncIteration

An example of asynchronous iterable:

   1 class AsyncIterable:
   2     def __aiter__(self):
   3         return self
   4 
   5     async def __anext__(self):
   6         data = await self.fetch_data()
   7         if data:
   8             return data
   9         else:
  10             raise StopAsyncIteration
  11 
  12     async def fetch_data(self):
  13         ...

собственно async for:

A new statement for iterating through asynchronous iterators is proposed:

   1 async for TARGET in ITER:
   2     BLOCK
   3 else:
   4     BLOCK2

which is semantically equivalent to:

   1 iter = (ITER)
   2 iter = type(iter).__aiter__(iter)
   3 running = True
   4 while running:
   5     try:
   6         TARGET = await type(iter).__anext__(iter)
   7     except StopAsyncIteration:
   8         running = False
   9     else:
  10         BLOCK
  11 else:
  12     BLOCK2

asynchronous comprehensions

(PEP 530)

еще теперь можно в них использовать await:

   1 result = [await fun() for fun in funcs]
   2 result = {await fun() for fun in funcs}
   3 result = {fun: await fun() for fun in funcs}
   4 
   5 result = [await fun() for fun in funcs if await smth]
   6 result = {await fun() for fun in funcs if await smth}
   7 result = {fun: await fun() for fun in funcs if await smth}
   8 
   9 result = [await fun() async for fun in funcs]
  10 result = {await fun() async for fun in funcs}
  11 result = {fun: await fun() async for fun in funcs}
  12 
  13 result = [await fun() async for fun in funcs if await smth]
  14 result = {await fun() async for fun in funcs if await smth}
  15 result = {fun: await fun() async for fun in funcs if await smth}

This is only valid in async def function body.

асинхронные генераторы

(PEP 525)

мы умеем легко писать генераторы:

   1 >>> def gen(n):
   2 ...     q=n
   3 ...     while q>0:
   4 ...             yield q
   5 ...             q = q//2
   6 ... 
   7 >>> for y in gen(100500):
   8 ...     print(y)
   9 ... 
  10 100500
  11 50250
  12 25125
  13 12562
  14 6281
  15 3140
  16 1570
  17 785
  18 392
  19 196
  20 98
  21 49
  22 24
  23 12
  24 6
  25 3
  26 1

несмотря на то, что на самом деле интерфейс генератора упорот и магическ: __iter__, __next__, вот это всё.

Ломающие новости! теперь асинхронные генераторы тоже можно так писать, не реализуя руками __aiter__() и остальные кишки.

(а еще они работают в 2 раза быстрее "ручной" реализации)

пример из пепа:

   1 async def ticker(delay, to):
   2     for i in range(to):
   3         yield i
   4         await asyncio.sleep(delay)
   5 
   6 async def run():
   7     async for i in ticker(1, 10):
   8         print(i)
   9 
  10 import asyncio
  11 loop = asyncio.get_event_loop()
  12 try:
  13     loop.run_until_complete(run())
  14 finally:
  15     loop.close()

доп.чтение: PEP 533

2 одновременно (а также вообще первый полностью дописанный пример асинхронного кода, содержащего большинство упомянутых понятий, который можно уже скопипастить в интерпретатор и посмотреть, как он работает. Настоятельно рекомендуется это сделать и втыкать, поковыривая, до тех пор, пока не станет понятно всё вышеописанное):

   1 async def ticker(delay, to):
   2     for i in range(to):
   3         yield (i,delay)
   4         await asyncio.sleep(delay)
   5 
   6 async def run(k):
   7     async for i in ticker(k, 10):
   8         print(i)
   9 
  10 import asyncio
  11 loop = asyncio.get_event_loop()
  12 try:
  13     loop.run_until_complete(asyncio.gather(run(1),run(2)))
  14 finally:
  15     loop.close()

результат:

(0, 2)
(0, 1)
(1, 1)
(1, 2)
(2, 1)
(3, 1)
(2, 2)
(4, 1)
(5, 1)
(3, 2)
(6, 1)
(7, 1)
(4, 2)
(8, 1)
(9, 1)
(5, 2)
(6, 2)
(7, 2)
(8, 2)
(9, 2)


Category762

Meetings/762/2017-02-21 (последним исправлял пользователь PrefixCactus 2017-03-08 22:59:58)