¿Qué hay dentro de asyncio?

En este artículo, invito al lector a realizar un viaje moderadamente emocionante en las entrañas de asyncio para comprender cómo implementa la ejecución de código asincrónico. Ensillamos las devoluciones de llamadas y corremos a través de una serie de eventos a través de un par de abstracciones clave directamente en la corutina. Si su mapa de Python aún no tiene estas atracciones, bienvenido a cat.


Para empezar, una breve referencia sobre el área se extendió ante nosotros.


asyncio es una biblioteca de E / S asíncrona que, según pep3153 , se creó para proporcionar un marco estandarizado para crear marcos asincrónicos. pep3156 también le atribuye la necesidad de proporcionar una integración extremadamente simple en marcos asincrónicos ya existentes (Twisted, Tornado, Gevent). Como podemos observar ahora, estos objetivos se lograron con éxito: apareció un nuevo marco basado en asincio : aiohttp , en Tornado AsyncioMainLoop es el bucle de eventos predeterminado de la versión 5.0, en Twisted asyncioreactor está disponible desde la versión 16.5.0, y hay una biblioteca de aiogevent de terceros para Gevent .


asyncio es una biblioteca híbrida que utiliza simultáneamente dos enfoques para implementar la ejecución de código asíncrono: clásico en devoluciones de llamada y, relativamente nuevo, (al menos para python) en las rutinas. Se basa en tres abstracciones principales, que son análogas a las abstracciones que existen en los marcos de terceros:


  • Bucle de eventos enchufable
    Bucle de eventos enchufable. Conectable significa que se puede reemplazar en dos líneas de código con otra que implemente la misma interfaz. Ahora hay implementaciones de cython encima de libuv ( uvloop ) y en Rust ( asyncio-tokio ) .
  • Futuro
    El resultado de la operación, que estará disponible en el futuro. Es necesario obtener el resultado de la ejecución de devoluciones de llamada en las rutinas.
  • Tarea
    Una subclase especial de Future para ejecutar la rutina en un bucle de eventos.

Vamos!


El ciclo de eventos es el componente principal de la biblioteca, a lo largo de las carreteras que lo atraviesan, los datos se entregan a cualquiera de sus componentes. Es grande y complejo, así que primero considere su versión simplificada.


# ~/inside_asyncio/base_loop.py import collections import random class Loop: def __init__(self): #     self.ready = collections.deque() def call_soon(self, callback, *args): #          self.ready.append((callback, args)) def run_until_complete(self, callback, *args): #         self.call_soon(callback, *args) #    -   #       while self.ready: ntodo = len(self.ready) #      #          for _ in range(ntodo): #       #        callback, args = self.ready.popleft() callback(*args) def callback(loop): print('') loop.call_soon(print, '') loop = Loop() loop.run_until_complete(callback, loop) 

Montando nuestra pequeña devolución de llamada, salimos a la carretera a través de call_soon , entramos en la cola y después de una breve espera se nos mostrará.


Episodio Bad Callback


Vale la pena mencionar que las devoluciones de llamada son caballos peligrosos: si te arrojan a la mitad del camino, el intérprete de Python no podrá ayudarte a entender dónde sucedió esto. Si no me cree, viaje de la misma manera en la devolución de llamada maybe_print, que llega a la meta aproximadamente la mitad del tiempo.


 # ~/inside_asyncio/base_loop.py def maybe_print(msg): if random.randint(0, 1): raise Exception(msg) else: print(msg) def starting_point(loop): #   print('') loop.call_soon(maybe_print, '') def main(loop): loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) loop = Loop() loop.run_until_complete(main, loop) 

A continuación se muestra el rastreo completo del ejemplo anterior. Debido al hecho de que la función maybe_print fue iniciada por un bucle de eventos, y no directamente desde el punto de inicio , el rastreo finaliza en el método run_until_complete . Es imposible determinar dónde está el punto de inicio en el código utilizando dicho rastreo, lo que complicará en gran medida la depuración si el punto de inicio se encuentra en varios lugares de la base de código.


 $: python3 base_loop.py >>  #    >>  #    >>  #    >> Traceback (most recent call last): >> File "base_loop.py", line 42, in <module> >> loop.run_until_complete(main, loop) >> File "base_loop.py", line 17, in run_until_complete >> callback(*args) >> File "base_loop.py", line 29, in maybe_print >> raise Exception(msg) >> Exception:  #     

Se necesita una pila continua de llamadas no solo para mostrar el rastreo completo, sino también para implementar otras características del lenguaje. Por ejemplo, el manejo de excepciones se basa en ello. El siguiente ejemplo no funcionará, porque para el momento en que comienza el punto_inicio, la función principal ya se habrá ejecutado:


 # ~/inside_asyncio/base_loop.py def main(loop): try: loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) except: pass Loop().run_until_complete(main, loop) 

El siguiente ejemplo tampoco funcionará. El administrador de contexto en la función principal abrirá y cerrará el archivo antes de que se inicie su procesamiento.


 # ~/inside_asyncio/base_loop.py def main(loop): with open('file.txt', 'rb') as f: loop.call_soon(process_file, f) Loop().run_until_complete(main, loop) #       ,    =( 

La falta de una pila continua de llamadas limita el uso de funciones de lenguaje familiares. Para evitar parcialmente este inconveniente en asyncio, tuve que agregar una gran cantidad de código adicional que no estaba directamente relacionado con la tarea que estaba resolviendo. Este código, en su mayor parte, falta en los ejemplos; son bastante complicados sin él.


Del ciclo de eventos al mundo exterior y viceversa.


El bucle de eventos se comunica con el mundo exterior a través del sistema operativo a través de eventos. El código que sabe cómo trabajar con él lo proporciona un módulo de biblioteca estándar llamado selectores . Le permite decirle al sistema operativo que estamos esperando algún tipo de evento y luego preguntar si sucedió. En el siguiente ejemplo, el evento esperado será la disponibilidad del socket de lectura.


Bucle de eventos
 # ~/inside_asyncio/event_loop.py import selectors import socket import collections from future import Future from handle import Handle from task import Task class EventLoop: def __init__(self): self.ready = collections.deque() #   self.selector = selectors.DefaultSelector() def add_reader(self, sock, callback): #       # : # , #         #           self.selector.register( sock, socket.EVENT_READ, (self._accept_conn, sock, callback) ) def _accept_conn(self, sock, callback): #    conn, addr = sock.accept() conn.setblocking(False) #      self.selector.register( conn, socket.EVENT_READ, (callback, conn) ) def run_until_complete(self, callback, *args): self.call_soon(callback, *args) #           -  while self.ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self.ready.popleft() callback(*args) #       for key, events in self.selector.select(timeout=0): #          callback, *args = key.data #       self.call_soon(callback, *args) def call_soon(self, callback, *args): self.ready.append((callback, args)) def print_data(conn): print(conn.recv(1000)) def main(loop): #   sock = socket.socket() #     8086  sock.bind(('localhost', 8086)) sock.listen(100) sock.setblocking(False) #      loop.add_reader(sock, print_data) loop = EventLoop() #    loop.run_until_complete(main, loop) 

Un mensajero del mundo exterior deja su mensaje o paquete en el selector, y el selector se lo pasa al destinatario. Ahora es posible leer desde el socket utilizando un bucle de eventos. Si ejecuta este código y se conecta usando netcat, mostrará fielmente todo lo que se le enviará.


 $: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n' 

Al comienzo del artículo, se decía que asyncio es una biblioteca híbrida en la que las corutinas funcionan además de las devoluciones de llamada. Para implementar esta funcionalidad, se utilizan dos abstracciones principales restantes: Tarea y Futuro . A continuación, se mostrará el código de estas abstracciones, y luego, utilizando su ciclo de eventos, se ejecutarán las rutinas.


Futuro


A continuación se muestra el código para la clase Future. Es necesario para que en corutina pueda esperar hasta que se complete la devolución de llamada y obtener su resultado.


Futuro
 # ~/inside_asyncio/future.py import sys from asyncio import events, CancelledError class Future: #        _state = 'PENDING' # FINISHED, CANCELLED #          Future #          _source_traceback = None #            _callbacks = [] #           _exception = None #           _loop = None #     _result = None def __init__(self, loop): self._loop = loop self._source_traceback = events.extract_stack(sys._getframe(1)) def add_done_callback(self, callback): #        self._callbacks.append(callback) def _schedule_callbacks(self): #         for callback in self._callbacks: self._loop.call_soon(callback, self) self._callbacks[:] = [] #            Future #    -     def set_exception(self, exception): #       self._exception = exception #   self._state = 'FINISHED' #      self._schedule_callbacks() def set_result(self, result): #         self._result = result self._state = 'FINISHED' self._schedule_callbacks() def cancel(self): #       self._state = 'CANCELLED' self._schedule_callbacks() def result(self): #     #         if self._state == 'CANCELLED': raise CancelledError #         if self._exception is not None: raise self._exception #    return self._result def __await__(self): #  ,    await #        if self._state == 'PENDING': yield self #     return self.result() 

Tarea


Esta es una subclase especial de Future . Es necesario ejecutar corutinas en el bucle de eventos de devolución de llamada.


Tarea
 # ~/inside_asyncio/task.py from asyncio import futures from future import Future class Task(Future): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) #    self._coro = coro def _step(self, exc=None): #    ,     try: if exc is None: #        None #        result = self._coro.send(None) else: #        self._coro.throw(exc) except StopIteration: result = None except Exception as exc: self.set_exception(exc) else: #   Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) #     step     elif result is None: self._loop.call_soon(self._step) def _wakeup(self, future): #     Future       Task #   try: future.result() except Exception as exc: self._step(exc) #        Future else: self._step() 

Un ciclo de eventos que pueden funcionar con Future


EventLoop con futuros
 # ~/inside_asyncio/future_event_loop.py import selectors from selectors import EVENT_READ, EVENT_WRITE import socket import collections from future import Future from task import Task class EventLoop: def __init__(self): self._ready = collections.deque() self.selector = selectors.DefaultSelector() def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): callback, *args = key.data self.call_soon(callback, *args) def call_soon(self, callback, *args): self._ready.append((callback, args)) #      Future def sock_accept(self, sock, fut=None): #       #  Future     fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: #    #   Future    fut.set_result((conn, address)) self.selector.unregister(sock) return fut def sock_recv(self, sock, n, fut=None): #       #      , #     ,      fut = fut if fut else Future(loop=self) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.selector.register( sock, EVENT_READ, (self.sock_recv, sock, n, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: fut.set_result(data) self.selector.unregister(sock) return fut async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    conn, addr = await loop.sock_accept(sock) #     result = await loop.sock_recv(conn, 1000) print(result) loop = EventLoop() #    Task task = Task(coro=main(loop), loop=loop) #         loop.run_until_complete(task._step) 

Sigamos adelante


Ahora veamos cómo se ejecutará la corutina principal :


Cumplimiento
 __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step    self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #     callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: #  None   result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): #      #   sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #         conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): #   Future fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) #  Future   return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __await__  Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): #   Future         if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: #           None result = self._coro.send(None) # result = fut -------------------------------- else: #  Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) #      -    Task  Future #    #               ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): #    callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: #    conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: #   Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): #   self._result = result #   self._state = 'FINISHED' #      self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: #         task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #      #    task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: #   Future      task._step self._step() def _step(self, exc=None): try: if exc is None: #       None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __awai__   conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self #   Future    return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    Future    conn  addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) 

De esta manera simple, asyncio realiza corutinas.


Resumen


El objetivo de crear asyncio se ha logrado con éxito. No solo resolvió el problema de compatibilidad, sino que también causó un gran aumento en el interés en la programación competitiva en la comunidad. Nuevos artículos y bibliotecas comenzaron a aparecer, como hongos después de la lluvia. Además, el asincio influyó en el lenguaje en sí: se le agregaron corutinas nativas y nuevas palabras clave asíncronas / en espera . La última vez que se agregó una nueva palabra clave en 2003, fue la palabra clave de rendimiento .


Uno de los objetivos de crear asyncio era proporcionar una integración extremadamente simple en frameworks asincrónicos ya existentes (Twisted, Tornado, Gevent). La elección de las herramientas se deriva lógicamente de este objetivo: si no existiera un requisito de compatibilidad, probablemente se asignaría a las corutinas el papel principal. Debido al hecho de que al programar devoluciones de llamada es imposible mantener una pila continua de llamadas, se tuvo que crear un sistema adicional en el límite entre ellas y las rutinas para admitir las características del lenguaje basadas en él.


Ahora la pregunta principal. ¿Por qué un simple usuario de la biblioteca debería saber todo esto, que sigue las recomendaciones de la documentación y usa solo rutinas y una API de alto nivel?
Aquí hay una parte de la documentación de la clase StreamWriter



Su instancia es devuelta por la función asyncio.open_connection y es la API async / await en la parte superior de la API de devolución de llamada. Y estas devoluciones de llamada sobresalen. Las funciones de escritura y líneas de escritura son síncronas, intentan escribir en el socket y, si eso falla, vuelcan los datos en el búfer subyacente y agregan devoluciones de llamada al registro. El drenaje de corutina es necesario para proporcionar la oportunidad de esperar hasta que la cantidad de datos en el búfer caiga al valor especificado.


Si olvida llamar al drenaje entre llamadas de escritura , entonces el búfer interno puede crecer a tamaños indecentes. Sin embargo, si tiene esto en cuenta, quedan algunos momentos desagradables. Primero: si la devolución de llamada en el registro "se rompe", entonces la rutina que usa esta API no lo sabrá de ninguna manera y, en consecuencia, no podrá procesarla. Segundo: si la rutina se "rompe", la devolución de llamada al registro no lo sabrá de ninguna manera y continuará escribiendo datos desde el búfer.


Por lo tanto, incluso utilizando solo corutinas, prepárese para el hecho de que las devoluciones de llamada se recordarán a sí mismas.


Puede leer sobre cómo trabajar con bases de datos a partir de código asíncrono en este artículo de nuestro blog corporativo Antida software .


PD: Gracias por la información sobre errores tipográficos e inexactitudes a los usuarios de eirnym , kurb , rasswet

Source: https://habr.com/ru/post/453348/


All Articles