Apa yang ada di dalam asyncio

Pada artikel ini, saya mengundang pembaca untuk melakukan perjalanan yang cukup menarik ke dalam asyncio dengan saya untuk memahami bagaimana ia mengimplementasikan eksekusi kode asinkron. Kami membebani panggilan balik dan bergegas melalui serangkaian peristiwa melalui beberapa abstraksi kunci langsung ke coroutine. Jika peta python Anda belum memiliki objek wisata ini, selamat datang di kucing.


Sebagai permulaan - referensi singkat tentang area yang tersebar di hadapan kami


asyncio adalah pustaka I / O asinkron yang, menurut pep3153 , dibuat untuk menyediakan kerangka kerja standar untuk membuat kerangka kerja asinkron. pep3156 juga mengatribusikan kepadanya kebutuhan untuk menyediakan integrasi yang sangat sederhana ke dalam kerangka asinkron yang sudah ada (Twisted, Tornado, Gevent). Seperti yang dapat kita amati sekarang, tujuan-tujuan ini berhasil dicapai - kerangka kerja berbasis asyncio baru muncul: aiohttp , di Tornado AsyncioMainLoop adalah loop peristiwa default dari versi 5.0, di Twisted asyncioreactor tersedia dari versi 16.5.0, dan ada perpustakaan pihak ketiga untuk Gevent .


asyncio adalah pustaka hybrid yang secara bersamaan menggunakan dua pendekatan untuk mengimplementasikan eksekusi kode asinkron: klasik pada panggilan balik dan, relatif baru, (setidaknya untuk python) pada coroutine. Ini didasarkan pada tiga abstraksi utama, yang merupakan analog dari abstraksi yang ada dalam kerangka kerja pihak ketiga:


  • Loop Acara Pluggable
    Peristiwa acara pluggable. Pluggable berarti dapat diganti dalam dua baris kode dengan yang lain yang mengimplementasikan antarmuka yang sama. Sekarang ada implementasi cython di atas libuv ( uvloop ) dan di Rust ( asyncio-tokio ) .
  • Masa depan
    Hasil operasi, yang akan tersedia di masa depan. Hal ini diperlukan untuk mendapatkan hasil eksekusi callback di coroutine.
  • Tugas
    Subclass khusus Future untuk menjalankan coroutine pada suatu event loop.

Ayo pergi!


Siklus peristiwa adalah komponen utama perpustakaan, di sepanjang jalan yang melewatinya, data dikirimkan ke salah satu komponennya. Ini besar dan kompleks, jadi pertama-tama pertimbangkan versi stripped-down-nya.


# ~/inside_asyncio/base_loop.py import collections import random class Loop: def __init__(self): #     self.ready = collections.deque() def call_soon(self, callback, *args): #          self.ready.append((callback, args)) def run_until_complete(self, callback, *args): #         self.call_soon(callback, *args) #    -   #       while self.ready: ntodo = len(self.ready) #      #          for _ in range(ntodo): #       #        callback, args = self.ready.popleft() callback(*args) def callback(loop): print('') loop.call_soon(print, '') loop = Loop() loop.run_until_complete(callback, loop) 

Mengendarai callback kecil kami, kami menuju jalan melalui call_soon , masuk ke antrian dan setelah menunggu sebentar kami akan ditampilkan.


Episode Callback Buruk


Perlu disebutkan bahwa panggilan balik adalah kuda berbahaya - jika mereka melempar Anda di tengah jalan, juru bahasa python tidak akan bisa membantu memahami di mana ini terjadi. Jika Anda tidak mempercayai saya , gunakan cara yang sama pada maybe_print callback , yang selesai hampir separuh waktu.


 # ~/inside_asyncio/base_loop.py def maybe_print(msg): if random.randint(0, 1): raise Exception(msg) else: print(msg) def starting_point(loop): #   print('') loop.call_soon(maybe_print, '') def main(loop): loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) loop = Loop() loop.run_until_complete(main, loop) 

Di bawah ini adalah traceback penuh dari contoh sebelumnya. Karena fakta bahwa fungsi maybe_print dimulai oleh loop acara, dan tidak langsung dari starting_point , traceback berakhir di situ, dalam metode run_until_complete . Tidak mungkin untuk menentukan di mana starting_point berada dalam kode menggunakan traceback seperti itu, yang akan sangat menyulitkan debugging jika starting_point terletak di beberapa tempat dalam basis kode.


 $: python3 base_loop.py >>  #    >>  #    >>  #    >> Traceback (most recent call last): >> File "base_loop.py", line 42, in <module> >> loop.run_until_complete(main, loop) >> File "base_loop.py", line 17, in run_until_complete >> callback(*args) >> File "base_loop.py", line 29, in maybe_print >> raise Exception(msg) >> Exception:  #     

Tumpukan panggilan kontinu diperlukan tidak hanya untuk menampilkan traceback penuh, tetapi juga untuk mengimplementasikan fitur bahasa lainnya. Misalnya, penanganan pengecualian didasarkan padanya. Contoh di bawah ini tidak akan berfungsi, karena pada saat starting_point dimulai, fungsi utama sudah dijalankan:


 # ~/inside_asyncio/base_loop.py def main(loop): try: loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) except: pass Loop().run_until_complete(main, loop) 

Contoh berikut juga tidak akan berfungsi. Manajer konteks dalam fungsi utama akan membuka dan menutup file sebelum pemrosesan dimulai.


 # ~/inside_asyncio/base_loop.py def main(loop): with open('file.txt', 'rb') as f: loop.call_soon(process_file, f) Loop().run_until_complete(main, loop) #       ,    =( 

Kurangnya tumpukan panggilan terus menerus membatasi penggunaan fitur bahasa yang dikenal. Untuk menghindari sebagian kelemahan ini, asyncio harus menambahkan banyak kode tambahan yang tidak terkait langsung dengan tugas yang diselesaikannya. Kode ini, sebagian besar, tidak ada dalam contoh - mereka cukup rumit tanpanya.


Dari siklus peristiwa ke dunia luar dan kembali


Lingkaran peristiwa berkomunikasi dengan dunia luar melalui sistem operasi melalui peristiwa. Kode yang tahu cara bekerja dengannya disediakan oleh modul perpustakaan standar yang disebut penyeleksi . Ini memungkinkan Anda untuk memberi tahu sistem operasi bahwa kami sedang menunggu semacam acara, dan kemudian bertanya apakah itu terjadi. Pada contoh di bawah ini, acara yang diharapkan akan membaca ketersediaan soket.


Perulangan acara
 # ~/inside_asyncio/event_loop.py import selectors import socket import collections from future import Future from handle import Handle from task import Task class EventLoop: def __init__(self): self.ready = collections.deque() #   self.selector = selectors.DefaultSelector() def add_reader(self, sock, callback): #       # : # , #         #           self.selector.register( sock, socket.EVENT_READ, (self._accept_conn, sock, callback) ) def _accept_conn(self, sock, callback): #    conn, addr = sock.accept() conn.setblocking(False) #      self.selector.register( conn, socket.EVENT_READ, (callback, conn) ) def run_until_complete(self, callback, *args): self.call_soon(callback, *args) #           -  while self.ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self.ready.popleft() callback(*args) #       for key, events in self.selector.select(timeout=0): #          callback, *args = key.data #       self.call_soon(callback, *args) def call_soon(self, callback, *args): self.ready.append((callback, args)) def print_data(conn): print(conn.recv(1000)) def main(loop): #   sock = socket.socket() #     8086  sock.bind(('localhost', 8086)) sock.listen(100) sock.setblocking(False) #      loop.add_reader(sock, print_data) loop = EventLoop() #    loop.run_until_complete(main, loop) 

Seorang kurir dari dunia luar meninggalkan pesan atau bingkisannya di pemilih, dan pemilih menyerahkannya ke penerima. Sekarang dimungkinkan untuk membaca dari soket menggunakan loop acara. Jika Anda menjalankan kode ini dan terhubung menggunakan netcat, ia akan dengan setia menampilkan semua yang akan dikirim kepadanya.


 $: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n' 

Di awal artikel, dikatakan bahwa asyncio adalah perpustakaan hybrid di mana coroutine bekerja di atas callback. Untuk mengimplementasikan fungsi ini, dua abstraksi utama yang tersisa digunakan: Tugas dan Masa Depan . Selanjutnya, kode abstraksi ini akan ditampilkan, dan kemudian, dengan menggunakan siklus kejadiannya, coroutine dieksekusi.


Masa depan


Di bawah ini adalah kode untuk kelas Future. Diperlukan agar di corutin Anda bisa menunggu hingga callback selesai dan mendapatkan hasilnya.


Masa depan
 # ~/inside_asyncio/future.py import sys from asyncio import events, CancelledError class Future: #        _state = 'PENDING' # FINISHED, CANCELLED #          Future #          _source_traceback = None #            _callbacks = [] #           _exception = None #           _loop = None #     _result = None def __init__(self, loop): self._loop = loop self._source_traceback = events.extract_stack(sys._getframe(1)) def add_done_callback(self, callback): #        self._callbacks.append(callback) def _schedule_callbacks(self): #         for callback in self._callbacks: self._loop.call_soon(callback, self) self._callbacks[:] = [] #            Future #    -     def set_exception(self, exception): #       self._exception = exception #   self._state = 'FINISHED' #      self._schedule_callbacks() def set_result(self, result): #         self._result = result self._state = 'FINISHED' self._schedule_callbacks() def cancel(self): #       self._state = 'CANCELLED' self._schedule_callbacks() def result(self): #     #         if self._state == 'CANCELLED': raise CancelledError #         if self._exception is not None: raise self._exception #    return self._result def __await__(self): #  ,    await #        if self._state == 'PENDING': yield self #     return self.result() 

Tugas


Ini adalah subkelas khusus Masa Depan . Diperlukan untuk menjalankan coroutine pada loop event callback.


Tugas
 # ~/inside_asyncio/task.py from asyncio import futures from future import Future class Task(Future): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) #    self._coro = coro def _step(self, exc=None): #    ,     try: if exc is None: #        None #        result = self._coro.send(None) else: #        self._coro.throw(exc) except StopIteration: result = None except Exception as exc: self.set_exception(exc) else: #   Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) #     step     elif result is None: self._loop.call_soon(self._step) def _wakeup(self, future): #     Future       Task #   try: future.result() except Exception as exc: self._step(exc) #        Future else: self._step() 

Siklus acara yang dapat bekerja dengan Future


EventLoop dengan Futures
 # ~/inside_asyncio/future_event_loop.py import selectors from selectors import EVENT_READ, EVENT_WRITE import socket import collections from future import Future from task import Task class EventLoop: def __init__(self): self._ready = collections.deque() self.selector = selectors.DefaultSelector() def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): callback, *args = key.data self.call_soon(callback, *args) def call_soon(self, callback, *args): self._ready.append((callback, args)) #      Future def sock_accept(self, sock, fut=None): #       #  Future     fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: #    #   Future    fut.set_result((conn, address)) self.selector.unregister(sock) return fut def sock_recv(self, sock, n, fut=None): #       #      , #     ,      fut = fut if fut else Future(loop=self) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.selector.register( sock, EVENT_READ, (self.sock_recv, sock, n, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: fut.set_result(data) self.selector.unregister(sock) return fut async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    conn, addr = await loop.sock_accept(sock) #     result = await loop.sock_recv(conn, 1000) print(result) loop = EventLoop() #    Task task = Task(coro=main(loop), loop=loop) #         loop.run_until_complete(task._step) 

Mari kita lanjutkan


Sekarang mari kita lihat bagaimana coroutine main akan dieksekusi:


Pemenuhan
 __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step    self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #     callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: #  None   result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): #      #   sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #         conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): #   Future fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) #  Future   return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __await__  Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): #   Future         if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: #           None result = self._coro.send(None) # result = fut -------------------------------- else: #  Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) #      -    Task  Future #    #               ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): #    callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: #    conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: #   Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): #   self._result = result #   self._state = 'FINISHED' #      self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: #         task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #      #    task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: #   Future      task._step self._step() def _step(self, exc=None): try: if exc is None: #       None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __awai__   conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self #   Future    return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    Future    conn  addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) 

Dengan cara sederhana ini, asyncio melakukan coroutine.


Ringkasan


Tujuan menciptakan asyncio telah berhasil dicapai. Itu tidak hanya memecahkan masalah kompatibilitas, tetapi juga menyebabkan peningkatan besar minat dalam pemrograman kompetitif di masyarakat. Artikel dan perpustakaan baru mulai muncul, seperti jamur setelah hujan. Selain itu, asyncio memengaruhi bahasa itu sendiri: coroutine asli dan kata kunci async / menunggu baru ditambahkan ke dalamnya. Terakhir kali kata kunci baru ditambahkan kembali pada tahun 2003, itu adalah kata kunci hasil .


Salah satu tujuan menciptakan asyncio adalah untuk menyediakan integrasi yang sangat sederhana ke dalam kerangka asinkron yang sudah ada (Twisted, Tornado, Gevent). Pilihan alat secara logis mengikuti dari tujuan ini: jika tidak ada persyaratan kompatibilitas, coroutine mungkin akan diberi peran utama. Karena kenyataan bahwa ketika pemrograman pada panggilan balik tidak mungkin untuk mempertahankan tumpukan panggilan yang berkelanjutan, sistem tambahan harus dibuat di perbatasan antara mereka dan coroutine untuk mendukung fitur bahasa berdasarkan itu.


Sekarang pertanyaan utamanya. Mengapa pengguna perpustakaan yang sederhana mengetahui semua ini, yang mengikuti rekomendasi dari dokumentasi dan hanya menggunakan coroutine dan API tingkat tinggi?
Berikut adalah dokumentasi kelas StreamWriter



Contohnya dikembalikan oleh fungsi asyncio.open_connection dan merupakan async / menunggu API di atas API panggilan balik. Dan panggilan balik ini keluar darinya. Fungsi tulis dan tulis adalah sinkron, mereka mencoba menulis ke soket, dan jika gagal, mereka membuang data ke buffer yang mendasarinya dan menambahkan panggilan balik ke catatan. Tiriskan corutin diperlukan untuk memberikan kesempatan untuk menunggu sampai jumlah data dalam buffer turun ke nilai yang ditentukan.


Jika Anda lupa memanggil saluran antara panggilan tulis , maka buffer internal dapat tumbuh menjadi ukuran tidak senonoh. Namun, jika Anda mengingat hal ini, beberapa momen tidak menyenangkan tetap ada. Pertama: jika panggilan balik pada catatan "rusak", maka coroutine yang menggunakan API ini tidak akan mengetahuinya dengan cara apa pun, dan karenanya, tidak akan dapat memprosesnya. Kedua: jika coroutine "pecah", maka panggilan balik ke catatan tidak akan mengetahuinya dengan cara apa pun dan akan terus menulis data dari buffer.


Jadi, bahkan hanya menggunakan coroutine, bersiaplah untuk kenyataan bahwa callback akan mengingatkan diri mereka sendiri.


Anda dapat membaca tentang cara bekerja dengan database dari kode asinkron dalam artikel ini dari perangkat lunak blog perusahaan Antida kami .


PS Terima kasih atas informasi tentang kesalahan ketik dan ketidakakuratan kepada pengguna eirnym , kurb , rasswet

Source: https://habr.com/ru/post/id453348/


All Articles