O que há dentro de assíncio

Neste artigo, convido o leitor a fazer uma jornada moderadamente emocionante para as entranhas do assíncio comigo para entender como ele implementa a execução assíncrona de código. Reunimos retornos de chamada e percorremos uma série de eventos por meio de algumas abstrações importantes até a corutin. Se o seu mapa python ainda não possui essas atrações, bem-vindo ao gato.


Para iniciantes - uma breve referência sobre a área espalhada diante de nós


asyncio é uma biblioteca de E / S assíncrona que, de acordo com o pep3153 , foi criada para fornecer uma estrutura padronizada para a criação de estruturas assíncronas. O pep3156 também atribui a ela a necessidade de fornecer uma integração extremamente simples em estruturas assíncronas já existentes (Twisted, Tornado, Gevent). Como podemos observar agora, esses objetivos foram alcançados com sucesso - uma nova estrutura assíncrona apareceu: aiohttp , no Tornado O AsyncioMainLoop é o loop de eventos padrão da versão 5.0, no Twisted asyncioreactor está disponível na versão 16.5.0 e existe uma biblioteca de aiogevent de terceiros para Gevent .


o asyncio é uma biblioteca híbrida que usa simultaneamente duas abordagens para implementar a execução assíncrona de código: clássico em retornos de chamada e, relativamente novo (pelo menos para python) em corotinas. Ele é baseado em três abstrações principais, que são análogas às abstrações existentes em estruturas de terceiros:


  • Loop de evento conectável
    Loop de evento conectável. Pluggable significa que ele pode ser substituído em duas linhas de código por outra que implemente a mesma interface. Agora existem implementações de cython em cima do libuv ( uvloop ) e em Rust ( asyncio-tokio ) .
  • Futuro
    O resultado da operação, que estará disponível no futuro. É necessário obter o resultado da execução de retornos de chamada em corotinas.
  • Tarefa
    Uma subclasse especial de Future para executar a corotina em um loop de eventos.

Vamos lá!


O ciclo de eventos é o principal componente da biblioteca. Ao longo das estradas que o atravessam, os dados são entregues a qualquer um de seus componentes. É grande e complexo, então considere primeiro sua versão simplificada.


# ~/inside_asyncio/base_loop.py import collections import random class Loop: def __init__(self): #     self.ready = collections.deque() def call_soon(self, callback, *args): #          self.ready.append((callback, args)) def run_until_complete(self, callback, *args): #         self.call_soon(callback, *args) #    -   #       while self.ready: ntodo = len(self.ready) #      #          for _ in range(ntodo): #       #        callback, args = self.ready.popleft() callback(*args) def callback(loop): print('') loop.call_soon(print, '') loop = Loop() loop.run_until_complete(callback, loop) 

Montando nosso pequeno retorno de chamada, pegamos a estrada via call_soon , entramos na fila e, após uma breve espera, seremos exibidos.


Bad Callback Episode


Vale ressaltar que os retornos de chamada são cavalos perigosos - se eles o jogam no meio da estrada, o intérprete python não poderá ajudar a entender onde isso aconteceu. Se você não acredita em mim , siga da mesma maneira o retorno de chamada de talvez_print, que chega ao fim cerca da metade do tempo.


 # ~/inside_asyncio/base_loop.py def maybe_print(msg): if random.randint(0, 1): raise Exception(msg) else: print(msg) def starting_point(loop): #   print('') loop.call_soon(maybe_print, '') def main(loop): loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) loop = Loop() loop.run_until_complete(main, loop) 

Abaixo está o rastreio completo do exemplo anterior. Devido ao fato de que a função maybe_print foi iniciada por um loop de eventos, e não diretamente do ponto de partida , o traceback termina nele, no método run_until_complete . É impossível determinar onde o ponto de partida está no código usando esse rastreamento, o que complicará bastante a depuração se o ponto de partida estiver localizado em vários locais na base de código.


 $: python3 base_loop.py >>  #    >>  #    >>  #    >> Traceback (most recent call last): >> File "base_loop.py", line 42, in <module> >> loop.run_until_complete(main, loop) >> File "base_loop.py", line 17, in run_until_complete >> callback(*args) >> File "base_loop.py", line 29, in maybe_print >> raise Exception(msg) >> Exception:  #     

Uma pilha de chamadas contínua é necessária não apenas para exibir o rastreamento completo, mas também para implementar outros recursos de idioma. Por exemplo, o tratamento de exceções é baseado nele. O exemplo abaixo não funcionará porque, quando o ponto de partida começar, a função principal já estará executada:


 # ~/inside_asyncio/base_loop.py def main(loop): try: loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) except: pass Loop().run_until_complete(main, loop) 

O exemplo a seguir também não funcionará. O gerenciador de contexto na função principal abrirá e fechará o arquivo antes de iniciar o processamento.


 # ~/inside_asyncio/base_loop.py def main(loop): with open('file.txt', 'rb') as f: loop.call_soon(process_file, f) Loop().run_until_complete(main, loop) #       ,    =( 

A falta de uma pilha contínua de chamadas limita o uso de recursos de idiomas familiares. Para contornar parcialmente essa desvantagem no assíncio, tive que adicionar muito código adicional que não estava diretamente relacionado à tarefa que estava resolvendo. Esse código, na maioria das vezes, está ausente nos exemplos - eles são bastante complicados sem ele.


Do ciclo de eventos ao mundo exterior e vice-versa


O loop de eventos se comunica com o mundo externo através do sistema operacional através de eventos. O código que sabe trabalhar com ele é fornecido por um módulo de biblioteca padrão chamado seletores . Ele permite que você diga ao sistema operacional que estamos aguardando algum tipo de evento e pergunte se aconteceu. No exemplo abaixo, o evento esperado será a disponibilidade do soquete de leitura.


Loop de eventos
 # ~/inside_asyncio/event_loop.py import selectors import socket import collections from future import Future from handle import Handle from task import Task class EventLoop: def __init__(self): self.ready = collections.deque() #   self.selector = selectors.DefaultSelector() def add_reader(self, sock, callback): #       # : # , #         #           self.selector.register( sock, socket.EVENT_READ, (self._accept_conn, sock, callback) ) def _accept_conn(self, sock, callback): #    conn, addr = sock.accept() conn.setblocking(False) #      self.selector.register( conn, socket.EVENT_READ, (callback, conn) ) def run_until_complete(self, callback, *args): self.call_soon(callback, *args) #           -  while self.ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self.ready.popleft() callback(*args) #       for key, events in self.selector.select(timeout=0): #          callback, *args = key.data #       self.call_soon(callback, *args) def call_soon(self, callback, *args): self.ready.append((callback, args)) def print_data(conn): print(conn.recv(1000)) def main(loop): #   sock = socket.socket() #     8086  sock.bind(('localhost', 8086)) sock.listen(100) sock.setblocking(False) #      loop.add_reader(sock, print_data) loop = EventLoop() #    loop.run_until_complete(main, loop) 

Um mensageiro do mundo exterior deixa sua mensagem ou parcela no seletor e o transmite ao destinatário. Agora é possível ler do soquete usando um loop de eventos. Se você executar esse código e se conectar usando o netcat, ele exibirá fielmente tudo o que será enviado a ele.


 $: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n' 

No início do artigo, dizia-se que o assíncio é uma biblioteca híbrida na qual as corotinas trabalham em cima de retornos de chamada. Para implementar essa funcionalidade, duas abstrações principais restantes são usadas: Tarefa e Futuro . A seguir, o código dessas abstrações será mostrado e, usando seu ciclo de eventos, as corotinas são executadas.


Futuro


Abaixo está o código para a classe Future. É necessário para que, em corutin, você possa esperar até que o retorno de chamada seja concluído e obtenha o resultado.


Futuro
 # ~/inside_asyncio/future.py import sys from asyncio import events, CancelledError class Future: #        _state = 'PENDING' # FINISHED, CANCELLED #          Future #          _source_traceback = None #            _callbacks = [] #           _exception = None #           _loop = None #     _result = None def __init__(self, loop): self._loop = loop self._source_traceback = events.extract_stack(sys._getframe(1)) def add_done_callback(self, callback): #        self._callbacks.append(callback) def _schedule_callbacks(self): #         for callback in self._callbacks: self._loop.call_soon(callback, self) self._callbacks[:] = [] #            Future #    -     def set_exception(self, exception): #       self._exception = exception #   self._state = 'FINISHED' #      self._schedule_callbacks() def set_result(self, result): #         self._result = result self._state = 'FINISHED' self._schedule_callbacks() def cancel(self): #       self._state = 'CANCELLED' self._schedule_callbacks() def result(self): #     #         if self._state == 'CANCELLED': raise CancelledError #         if self._exception is not None: raise self._exception #    return self._result def __await__(self): #  ,    await #        if self._state == 'PENDING': yield self #     return self.result() 

Tarefa


Esta é uma subclasse especial de Futuro . É necessário executar corotinas no loop de eventos de retorno de chamada.


Tarefa
 # ~/inside_asyncio/task.py from asyncio import futures from future import Future class Task(Future): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) #    self._coro = coro def _step(self, exc=None): #    ,     try: if exc is None: #        None #        result = self._coro.send(None) else: #        self._coro.throw(exc) except StopIteration: result = None except Exception as exc: self.set_exception(exc) else: #   Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) #     step     elif result is None: self._loop.call_soon(self._step) def _wakeup(self, future): #     Future       Task #   try: future.result() except Exception as exc: self._step(exc) #        Future else: self._step() 

Um ciclo de eventos que podem funcionar com o Future


EventLoop com Futuros
 # ~/inside_asyncio/future_event_loop.py import selectors from selectors import EVENT_READ, EVENT_WRITE import socket import collections from future import Future from task import Task class EventLoop: def __init__(self): self._ready = collections.deque() self.selector = selectors.DefaultSelector() def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): callback, *args = key.data self.call_soon(callback, *args) def call_soon(self, callback, *args): self._ready.append((callback, args)) #      Future def sock_accept(self, sock, fut=None): #       #  Future     fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: #    #   Future    fut.set_result((conn, address)) self.selector.unregister(sock) return fut def sock_recv(self, sock, n, fut=None): #       #      , #     ,      fut = fut if fut else Future(loop=self) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.selector.register( sock, EVENT_READ, (self.sock_recv, sock, n, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: fut.set_result(data) self.selector.unregister(sock) return fut async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    conn, addr = await loop.sock_accept(sock) #     result = await loop.sock_recv(conn, 1000) print(result) loop = EventLoop() #    Task task = Task(coro=main(loop), loop=loop) #         loop.run_until_complete(task._step) 

Vamos seguir em frente


Agora vamos ver como a coroutine main será executada:


Cumprimento
 __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step    self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #     callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: #  None   result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): #      #   sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #         conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): #   Future fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) #  Future   return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __await__  Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): #   Future         if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: #           None result = self._coro.send(None) # result = fut -------------------------------- else: #  Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) #      -    Task  Future #    #               ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): #    callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: #    conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: #   Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): #   self._result = result #   self._state = 'FINISHED' #      self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: #         task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #      #    task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: #   Future      task._step self._step() def _step(self, exc=None): try: if exc is None: #       None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __awai__   conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self #   Future    return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    Future    conn  addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) 

Dessa maneira simples, o assíncio executa corotinas.


Sumário


O objetivo de criar asyncio foi alcançado com sucesso. Ele não apenas resolveu o problema de compatibilidade, mas também causou um grande aumento no interesse em programação competitiva na comunidade. Novos artigos e bibliotecas começaram a aparecer, como cogumelos depois da chuva. Além disso, o asyncio influenciou o idioma em si: foram adicionadas a ele corotinas nativas e novas palavras-chave async / wait. A última vez que uma nova palavra-chave foi adicionada em 2003, era a palavra-chave yield .


Um dos objetivos da criação de assíncrono era fornecer integração extremamente simples em estruturas assíncronas já existentes (Twisted, Tornado, Gevent). A escolha das ferramentas segue logicamente a partir deste objetivo: se não houvesse requisitos de compatibilidade, as corotinas provavelmente receberiam o papel principal. Devido ao fato de que, ao programar em retornos de chamada, é impossível manter uma pilha contínua de chamadas, um sistema adicional teve que ser criado na fronteira entre elas e as corotinas para suportar os recursos de linguagem baseados nela.


Agora a questão principal. Por que um usuário simples da biblioteca deve saber tudo isso, que segue as recomendações da documentação e usa apenas corotinas e uma API de alto nível?
Aqui está uma parte da documentação da classe StreamWriter



Sua instância é retornada pela função asyncio.open_connection e é a API async / waitit na parte superior da API de retorno de chamada. E esses retornos de chamada se destacam. As funções de gravação e writelines são síncronas, tentam gravar no soquete e, se isso falhar, despejam os dados no buffer subjacente e adicionam retornos de chamada ao registro. O dreno de Corutin é necessário para fornecer a oportunidade de aguardar até que a quantidade de dados no buffer caia para o valor especificado.


Se você esquecer a drenagem de chamadas entre as chamadas de gravação , o buffer interno poderá aumentar para tamanhos indecentes. No entanto, se você se lembrar disso, restam alguns momentos desagradáveis. Primeiro: se o retorno de chamada no registro "quebrar", a corotina que usa essa API não saberá sobre ele de forma alguma e, portanto, não poderá processá-lo. Segundo: se a rotina "quebrar", o retorno de chamada no registro não saberá sobre isso e continuará gravando dados do buffer.


Assim, mesmo usando apenas corotinas, esteja preparado para o fato de que os retornos de chamada lembrarão de si mesmos.


Você pode ler sobre como trabalhar com bancos de dados a partir de código assíncrono neste artigo do software Antida do blog corporativo .


PS Obrigado pelas informações sobre erros de digitação e imprecisões para usuários de eirnym , kurb , rasswet

Source: https://habr.com/ru/post/pt453348/


All Articles