Neste artigo, convido o leitor a fazer uma jornada moderadamente emocionante para as entranhas do assíncio comigo para entender como ele implementa a execução assíncrona de código. Reunimos retornos de chamada e percorremos uma série de eventos por meio de algumas abstrações importantes até a corutin. Se o seu mapa python ainda não possui essas atrações, bem-vindo ao gato.
Para iniciantes - uma breve referência sobre a área espalhada diante de nós
asyncio é uma biblioteca de E / S assíncrona que, de acordo com o pep3153 , foi criada para fornecer uma estrutura padronizada para a criação de estruturas assíncronas. O pep3156 também atribui a ela a necessidade de fornecer uma integração extremamente simples em estruturas assíncronas já existentes (Twisted, Tornado, Gevent). Como podemos observar agora, esses objetivos foram alcançados com sucesso - uma nova estrutura assíncrona apareceu: aiohttp , no Tornado O AsyncioMainLoop é o loop de eventos padrão da versão 5.0, no Twisted asyncioreactor está disponível na versão 16.5.0 e existe uma biblioteca de aiogevent de terceiros para Gevent .
o asyncio é uma biblioteca híbrida que usa simultaneamente duas abordagens para implementar a execução assíncrona de código: clássico em retornos de chamada e, relativamente novo (pelo menos para python) em corotinas. Ele é baseado em três abstrações principais, que são análogas às abstrações existentes em estruturas de terceiros:
- Loop de evento conectável
Loop de evento conectável. Pluggable significa que ele pode ser substituído em duas linhas de código por outra que implemente a mesma interface. Agora existem implementações de cython em cima do libuv ( uvloop ) e em Rust ( asyncio-tokio ) . - Futuro
O resultado da operação, que estará disponível no futuro. É necessário obter o resultado da execução de retornos de chamada em corotinas. - Tarefa
Uma subclasse especial de Future para executar a corotina em um loop de eventos.
Vamos lá!
O ciclo de eventos é o principal componente da biblioteca. Ao longo das estradas que o atravessam, os dados são entregues a qualquer um de seus componentes. É grande e complexo, então considere primeiro sua versão simplificada.
Montando nosso pequeno retorno de chamada, pegamos a estrada via call_soon , entramos na fila e, após uma breve espera, seremos exibidos.
Bad Callback Episode
Vale ressaltar que os retornos de chamada são cavalos perigosos - se eles o jogam no meio da estrada, o intérprete python não poderá ajudar a entender onde isso aconteceu. Se você não acredita em mim , siga da mesma maneira o retorno de chamada de talvez_print, que chega ao fim cerca da metade do tempo.
Abaixo está o rastreio completo do exemplo anterior. Devido ao fato de que a função maybe_print foi iniciada por um loop de eventos, e não diretamente do ponto de partida , o traceback termina nele, no método run_until_complete . É impossível determinar onde o ponto de partida está no código usando esse rastreamento, o que complicará bastante a depuração se o ponto de partida estiver localizado em vários locais na base de código.
$: python3 base_loop.py >>
Uma pilha de chamadas contínua é necessária não apenas para exibir o rastreamento completo, mas também para implementar outros recursos de idioma. Por exemplo, o tratamento de exceções é baseado nele. O exemplo abaixo não funcionará porque, quando o ponto de partida começar, a função principal já estará executada:
O exemplo a seguir também não funcionará. O gerenciador de contexto na função principal abrirá e fechará o arquivo antes de iniciar o processamento.
A falta de uma pilha contínua de chamadas limita o uso de recursos de idiomas familiares. Para contornar parcialmente essa desvantagem no assíncio, tive que adicionar muito código adicional que não estava diretamente relacionado à tarefa que estava resolvendo. Esse código, na maioria das vezes, está ausente nos exemplos - eles são bastante complicados sem ele.
Do ciclo de eventos ao mundo exterior e vice-versa
O loop de eventos se comunica com o mundo externo através do sistema operacional através de eventos. O código que sabe trabalhar com ele é fornecido por um módulo de biblioteca padrão chamado seletores . Ele permite que você diga ao sistema operacional que estamos aguardando algum tipo de evento e pergunte se aconteceu. No exemplo abaixo, o evento esperado será a disponibilidade do soquete de leitura.
Um mensageiro do mundo exterior deixa sua mensagem ou parcela no seletor e o transmite ao destinatário. Agora é possível ler do soquete usando um loop de eventos. Se você executar esse código e se conectar usando o netcat, ele exibirá fielmente tudo o que será enviado a ele.
$: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n'
No início do artigo, dizia-se que o assíncio é uma biblioteca híbrida na qual as corotinas trabalham em cima de retornos de chamada. Para implementar essa funcionalidade, duas abstrações principais restantes são usadas: Tarefa e Futuro . A seguir, o código dessas abstrações será mostrado e, usando seu ciclo de eventos, as corotinas são executadas.
Futuro
Abaixo está o código para a classe Future. É necessário para que, em corutin, você possa esperar até que o retorno de chamada seja concluído e obtenha o resultado.
Tarefa
Esta é uma subclasse especial de Futuro . É necessário executar corotinas no loop de eventos de retorno de chamada.
Um ciclo de eventos que podem funcionar com o Future
Vamos seguir em frente
Agora vamos ver como a coroutine main será executada:
Cumprimento __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): # # sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): # Future fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # # # Future self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) # Future return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __await__ Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): # Future if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) # result = fut -------------------------------- else: # Future # wakeup if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) # - Task Future # # ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): # callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: # Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): # self._result = result # self._state = 'FINISHED' # self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: # task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # # task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: # Future task._step self._step() def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __awai__ conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self # Future return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # Future conn addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
Dessa maneira simples, o assíncio executa corotinas.
Sumário
O objetivo de criar asyncio foi alcançado com sucesso. Ele não apenas resolveu o problema de compatibilidade, mas também causou um grande aumento no interesse em programação competitiva na comunidade. Novos artigos e bibliotecas começaram a aparecer, como cogumelos depois da chuva. Além disso, o asyncio influenciou o idioma em si: foram adicionadas a ele corotinas nativas e novas palavras-chave async / wait. A última vez que uma nova palavra-chave foi adicionada em 2003, era a palavra-chave yield .
Um dos objetivos da criação de assíncrono era fornecer integração extremamente simples em estruturas assíncronas já existentes (Twisted, Tornado, Gevent). A escolha das ferramentas segue logicamente a partir deste objetivo: se não houvesse requisitos de compatibilidade, as corotinas provavelmente receberiam o papel principal. Devido ao fato de que, ao programar em retornos de chamada, é impossível manter uma pilha contínua de chamadas, um sistema adicional teve que ser criado na fronteira entre elas e as corotinas para suportar os recursos de linguagem baseados nela.
Agora a questão principal. Por que um usuário simples da biblioteca deve saber tudo isso, que segue as recomendações da documentação e usa apenas corotinas e uma API de alto nível?
Aqui está uma parte da documentação da classe StreamWriter

Sua instância é retornada pela função asyncio.open_connection e é a API async / waitit na parte superior da API de retorno de chamada. E esses retornos de chamada se destacam. As funções de gravação e writelines são síncronas, tentam gravar no soquete e, se isso falhar, despejam os dados no buffer subjacente e adicionam retornos de chamada ao registro. O dreno de Corutin é necessário para fornecer a oportunidade de aguardar até que a quantidade de dados no buffer caia para o valor especificado.
Se você esquecer a drenagem de chamadas entre as chamadas de gravação , o buffer interno poderá aumentar para tamanhos indecentes. No entanto, se você se lembrar disso, restam alguns momentos desagradáveis. Primeiro: se o retorno de chamada no registro "quebrar", a corotina que usa essa API não saberá sobre ele de forma alguma e, portanto, não poderá processá-lo. Segundo: se a rotina "quebrar", o retorno de chamada no registro não saberá sobre isso e continuará gravando dados do buffer.
Assim, mesmo usando apenas corotinas, esteja preparado para o fato de que os retornos de chamada lembrarão de si mesmos.
Você pode ler sobre como trabalhar com bancos de dados a partir de código assíncrono neste artigo do software Antida do blog corporativo .
PS Obrigado pelas informações sobre erros de digitação e imprecisões para usuários de eirnym , kurb , rasswet