ما هو داخل المتزامن

في هذه المقالة ، أدعو القارئ للقيام برحلة مثيرة إلى حد ما في أحشاء asyncio معي لفهم كيف ينفذ تنفيذ التعليمات البرمجية غير المتزامنة. سرج عمليات الاسترجاعات وندفع من خلال سلسلة من الأحداث من خلال اثنين من التجريدات الرئيسية الحق في corutin. إذا لم تتضمن خريطة الثعبان هذه المعالم السياحية ، فمرحباً بك في cat.


بالنسبة للمبتدئين - إشارة موجزة حول المنطقة المنتشرة أمامنا


asyncio هي مكتبة I / O غير متزامنة تم إنشاؤها وفقًا pep3153 لتوفير إطار عمل موحد لإنشاء أطر عمل غير متزامنة. تعزو إليها pep3156 أيضًا الحاجة إلى توفير دمج بسيط للغاية في الأطر غير المتزامنة الموجودة بالفعل (Twisted، Tornado، Gevent). كما يمكننا أن نلاحظ الآن ، تم تحقيق هذه الأهداف بنجاح - ظهر إطار جديد غير متزامن: aiohttp ، في Tornado AsyncioMainLoop هي حلقة الحدث الافتراضية من الإصدار 5.0 ، في Twisted asyncioreactor يتوفر من الإصدار 16.5.0 ، وهناك مكتبة aiogevent تابعة لجهة خارجية .


asyncio هي مكتبة هجينة تستخدم في الوقت نفسه طريقتين لتنفيذ تنفيذ التعليمات البرمجية غير المتزامنة: كلاسيكية في عمليات الاسترجاعات الحديثة وجديدة نسبيًا (على الأقل للثعبان) على coroutines. يعتمد على ثلاثة أشكال تجريدية رئيسية ، وهي نظائر تجريدية موجودة في أطر خارجية:


  • قابل للتوصيل حلقة الحدث
    حلقة الحدث للتوصيل. يعني Plableable أنه يمكن استبداله في سطرين من التعليمات البرمجية بسطر آخر ينفذ نفس الواجهة. الآن هناك تطبيقات cython على قمة libuv ( uvloop ) وفي Rust ( asyncio-tokio ) .
  • مستقبل
    نتيجة العملية ، والتي ستكون متاحة في المستقبل. من الضروري تلقي نتيجة تنفيذ عمليات الاسترجاعات في coroutines.
  • مهمة
    فئة فرعية خاصة من المستقبل لتشغيل coroutine على حلقة الحدث.

دعنا نذهب!


تمثل دورة الأحداث العنصر الرئيسي للمكتبة ، على طول الطرق التي تمر من خلالها ، يتم تسليم البيانات إلى أي من مكوناتها. إنها كبيرة ومعقدة ، لذا فكر أولاً في نسختها المجردة.


# ~/inside_asyncio/base_loop.py import collections import random class Loop: def __init__(self): #     self.ready = collections.deque() def call_soon(self, callback, *args): #          self.ready.append((callback, args)) def run_until_complete(self, callback, *args): #         self.call_soon(callback, *args) #    -   #       while self.ready: ntodo = len(self.ready) #      #          for _ in range(ntodo): #       #        callback, args = self.ready.popleft() callback(*args) def callback(loop): print('') loop.call_soon(print, '') loop = Loop() loop.run_until_complete(callback, loop) 

ركوبنا رد الاتصال قليلا ، وصلنا إلى الطريق عبر call_soon ، والحصول على قائمة الانتظار وبعد انتظار قصير سوف يتم عرضها.


حلقة رد اتصال سيئة


تجدر الإشارة إلى أن عمليات الاستدعاء هي خيول خطيرة - إذا قاموا برميها في منتصف الطريق ، فلن يتمكن مترجم بايثون من المساعدة في فهم مكان حدوث ذلك. إذا كنت لا تصدقني ، فركب بنفس الطريقة على رد الاتصال ربما_الصحيح ، والذي ينتهي حتى حوالي نصف الوقت.


 # ~/inside_asyncio/base_loop.py def maybe_print(msg): if random.randint(0, 1): raise Exception(msg) else: print(msg) def starting_point(loop): #   print('') loop.call_soon(maybe_print, '') def main(loop): loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) loop = Loop() loop.run_until_complete(main, loop) 

يوجد أدناه التتبع الكامل للمثال السابق. نظرًا لحقيقة أنه قد تم بدء تشغيل دالة ربما من خلال حلقة حدث ، وليس مباشرة من start_point ، ينتهي التتبع في ذلك ، في طريقة run_until_complete . من المستحيل تحديد مكان start_point في التعليمة البرمجية باستخدام مثل هذا التتبع ، مما سيعقد عملية تصحيح الأخطاء بشكل كبير إذا كان start_point موجودًا في عدة أماكن في قاعدة البيانات.


 $: python3 base_loop.py >>  #    >>  #    >>  #    >> Traceback (most recent call last): >> File "base_loop.py", line 42, in <module> >> loop.run_until_complete(main, loop) >> File "base_loop.py", line 17, in run_until_complete >> callback(*args) >> File "base_loop.py", line 29, in maybe_print >> raise Exception(msg) >> Exception:  #     

هناك حاجة إلى مكدس اتصال مستمر ليس فقط لعرض التتبع الكامل ، ولكن أيضًا لتطبيق ميزات اللغة الأخرى. على سبيل المثال ، يستند الاستثناء معالجة عليه. لن يعمل المثال أدناه ، لأنه بحلول الوقت الذي يبدأ فيه start_point ، سيتم تنفيذ الوظيفة الرئيسية بالفعل:


 # ~/inside_asyncio/base_loop.py def main(loop): try: loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) except: pass Loop().run_until_complete(main, loop) 

المثال التالي لن يعمل أيضًا. سيقوم مدير السياق في الوظيفة الرئيسية بفتح الملف وإغلاقه قبل بدء المعالجة.


 # ~/inside_asyncio/base_loop.py def main(loop): with open('file.txt', 'rb') as f: loop.call_soon(process_file, f) Loop().run_until_complete(main, loop) #       ,    =( 

يحد عدم وجود مكدس اتصال مستمر من استخدام ميزات اللغة المألوفة. للتحايل جزئياً على هذا العيب في تزامن ، اضطررت إلى إضافة الكثير من التعليمات البرمجية الإضافية التي لم تكن ذات صلة مباشرة بالمهمة التي كانت تقوم بحلها. هذه الشفرة ، في معظمها ، مفقودة في الأمثلة - فهي معقدة للغاية بدونها.


من دورة الأحداث إلى العالم الخارجي والعودة


تتواصل حلقة الحدث مع العالم الخارجي من خلال نظام التشغيل من خلال الأحداث. يتم توفير التعليمات البرمجية التي تعرف كيفية التعامل معها بواسطة وحدة نمطية للمكتبة القياسية تسمى المحددات . يتيح لك إخبار نظام التشغيل بأننا ننتظر حدثًا ما ، ثم نسأل ما إذا كان قد حدث. في المثال أدناه ، سيتم قراءة الحدث المتوقع توفر مأخذ التوصيل.


حلقة الحدث
 # ~/inside_asyncio/event_loop.py import selectors import socket import collections from future import Future from handle import Handle from task import Task class EventLoop: def __init__(self): self.ready = collections.deque() #   self.selector = selectors.DefaultSelector() def add_reader(self, sock, callback): #       # : # , #         #           self.selector.register( sock, socket.EVENT_READ, (self._accept_conn, sock, callback) ) def _accept_conn(self, sock, callback): #    conn, addr = sock.accept() conn.setblocking(False) #      self.selector.register( conn, socket.EVENT_READ, (callback, conn) ) def run_until_complete(self, callback, *args): self.call_soon(callback, *args) #           -  while self.ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self.ready.popleft() callback(*args) #       for key, events in self.selector.select(timeout=0): #          callback, *args = key.data #       self.call_soon(callback, *args) def call_soon(self, callback, *args): self.ready.append((callback, args)) def print_data(conn): print(conn.recv(1000)) def main(loop): #   sock = socket.socket() #     8086  sock.bind(('localhost', 8086)) sock.listen(100) sock.setblocking(False) #      loop.add_reader(sock, print_data) loop = EventLoop() #    loop.run_until_complete(main, loop) 

الرسول من العالم الخارجي يترك رسالته أو لا يتجزأ في محدد ، ويمررها محدد إلى المتلقي. من الممكن الآن القراءة من المقبس باستخدام حلقة حدث. إذا قمت بتشغيل هذا الرمز وقمت بالاتصال باستخدام netcat ، فسوف يعرض بأمانة كل ما سيتم إرساله إليه.


 $: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n' 

في بداية المقال ، قيل إن asyncio هي مكتبة مختلطة تعمل فيها coroutines أعلى عمليات الاسترجاعات. لتنفيذ هذه الوظيفة ، يتم استخدام التجريدين الرئيسيين المتبقيين: المهام والمستقبل . بعد ذلك ، سيتم عرض رمز هذه التجريدات ، وبعد ذلك ، باستخدام دورة الأحداث الخاصة بهم ، يتم تنفيذ coroutines.


مستقبل


أدناه هو رمز لفئة المستقبل. هناك حاجة لذلك في corutin يمكنك الانتظار حتى اكتمال رد الاتصال والحصول على نتيجته.


مستقبل
 # ~/inside_asyncio/future.py import sys from asyncio import events, CancelledError class Future: #        _state = 'PENDING' # FINISHED, CANCELLED #          Future #          _source_traceback = None #            _callbacks = [] #           _exception = None #           _loop = None #     _result = None def __init__(self, loop): self._loop = loop self._source_traceback = events.extract_stack(sys._getframe(1)) def add_done_callback(self, callback): #        self._callbacks.append(callback) def _schedule_callbacks(self): #         for callback in self._callbacks: self._loop.call_soon(callback, self) self._callbacks[:] = [] #            Future #    -     def set_exception(self, exception): #       self._exception = exception #   self._state = 'FINISHED' #      self._schedule_callbacks() def set_result(self, result): #         self._result = result self._state = 'FINISHED' self._schedule_callbacks() def cancel(self): #       self._state = 'CANCELLED' self._schedule_callbacks() def result(self): #     #         if self._state == 'CANCELLED': raise CancelledError #         if self._exception is not None: raise self._exception #    return self._result def __await__(self): #  ,    await #        if self._state == 'PENDING': yield self #     return self.result() 

مهمة


هذا هو فئة فرعية خاصة من المستقبل . هناك حاجة لتشغيل coroutines على حلقة حدث رد الاتصال.


مهمة
 # ~/inside_asyncio/task.py from asyncio import futures from future import Future class Task(Future): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) #    self._coro = coro def _step(self, exc=None): #    ,     try: if exc is None: #        None #        result = self._coro.send(None) else: #        self._coro.throw(exc) except StopIteration: result = None except Exception as exc: self.set_exception(exc) else: #   Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) #     step     elif result is None: self._loop.call_soon(self._step) def _wakeup(self, future): #     Future       Task #   try: future.result() except Exception as exc: self._step(exc) #        Future else: self._step() 

حلقة من الأحداث التي يمكن أن تعمل مع المستقبل


EventLoop مع العقود الآجلة
 # ~/inside_asyncio/future_event_loop.py import selectors from selectors import EVENT_READ, EVENT_WRITE import socket import collections from future import Future from task import Task class EventLoop: def __init__(self): self._ready = collections.deque() self.selector = selectors.DefaultSelector() def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): callback, *args = key.data self.call_soon(callback, *args) def call_soon(self, callback, *args): self._ready.append((callback, args)) #      Future def sock_accept(self, sock, fut=None): #       #  Future     fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: #    #   Future    fut.set_result((conn, address)) self.selector.unregister(sock) return fut def sock_recv(self, sock, n, fut=None): #       #      , #     ,      fut = fut if fut else Future(loop=self) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.selector.register( sock, EVENT_READ, (self.sock_recv, sock, n, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: fut.set_result(data) self.selector.unregister(sock) return fut async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    conn, addr = await loop.sock_accept(sock) #     result = await loop.sock_recv(conn, 1000) print(result) loop = EventLoop() #    Task task = Task(coro=main(loop), loop=loop) #         loop.run_until_complete(task._step) 

دعنا ننتقل


الآن دعونا نرى كيف سيتم تنفيذ coroutine الرئيسي :


إعدام
 __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step    self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #     callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: #  None   result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): #      #   sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #         conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): #   Future fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) #  Future   return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __await__  Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): #   Future         if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: #           None result = self._coro.send(None) # result = fut -------------------------------- else: #  Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) #      -    Task  Future #    #               ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): #    callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: #    conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: #   Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): #   self._result = result #   self._state = 'FINISHED' #      self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: #         task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #      #    task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: #   Future      task._step self._step() def _step(self, exc=None): try: if exc is None: #       None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __awai__   conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self #   Future    return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    Future    conn  addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) 

بهذه الطريقة البسيطة ، ينفذ asyncio coroutines.


النتائج


تم تحقيق هدف إنشاء التزامن. لم يحل مشكلة التوافق فحسب ، بل تسبب أيضًا في زيادة كبيرة في الاهتمام بالبرمجة التنافسية في المجتمع. بدأت المقالات والمكتبات الجديدة في الظهور ، مثل الفطر بعد المطر. بالإضافة إلى ذلك ، أثرت المزامنة على اللغة نفسها: تمت إضافة كلمات مفتاحية أصلية وكلمات أساسية متزامنة / انتظار جديدة إليها. في المرة الأخيرة التي تمت فيها إضافة كلمة رئيسية جديدة في عام 2003 ، كانت الكلمة الرئيسية ذات العائد .


كان أحد أهداف إنشاء التزامن هو توفير دمج بسيط للغاية في الأطر غير المتزامنة الموجودة بالفعل (Twisted، Tornado، Gevent). إن اختيار الأدوات يتبع منطقيا من هذا الهدف: إذا لم يكن هناك شرط توافق ، فمن المحتمل أن يُعطى الدور الرئيسي للكوروتينات. نظرًا لحقيقة أنه عند البرمجة على عمليات الاسترجاعات ، من المستحيل الحفاظ على تكدس مستمر للمكالمات ، يجب إنشاء نظام إضافي على الحدود بينها وبين coroutines لدعم ميزات اللغة المستندة إليها.


الآن السؤال الرئيسي. لماذا يجب أن يعرف المستخدم البسيط للمكتبة كل هذا ، والذي يتبع التوصيات الواردة في الوثائق ويستخدم فقط coroutines و API رفيع المستوى؟
فيما يلي جزء من وثائق الفئة StreamWriter



يتم إرجاع مثيله بواسطة الدالة asyncio.open_connection وهو API async / await أعلى واجهة برمجة تطبيقات رد الاتصال. وتعود هذه الاسترجاعات منها. تكون وظائف الكتابة والكتابة متزامنة ، وتحاول الكتابة إلى المقبس ، وإذا فشل ذلك ، فإنها تفريغ البيانات في المخزن المؤقت الأساسي وإضافة عمليات الاسترجاعات إلى السجل. هناك حاجة إلى استنزاف Corutin لتوفير الفرصة للانتظار حتى تنخفض كمية البيانات الموجودة في المخزن المؤقت إلى القيمة المحددة.


إذا نسيت استدعاء التصريف بين مكالمات الكتابة ، فيمكن أن يزيد حجم المخزن المؤقت الداخلي إلى أحجام غير لائقة. ومع ذلك ، إذا وضعت ذلك في الاعتبار ، تبقى بضع لحظات غير سارة. أولاً: إذا "انقطع الاتصال" في السجل ، فلن يعرف coroutine باستخدام واجهة برمجة التطبيقات هذه بأي طريقة ، وبالتالي ، لن يكون قادرًا على معالجته. ثانيًا: إذا تعطل coroutine ، فلن يعرف رد الاتصال بالسجل عنه بأي طريقة وسيواصل كتابة البيانات من المخزن المؤقت.


وبالتالي ، حتى باستخدام coroutines فقط ، كن مستعدًا لحقيقة أن عمليات الاستدعاء ستذكر نفسها.


يمكنك قراءة كيفية التعامل مع قواعد البيانات من التعليمات البرمجية غير المتزامنة في هذه المقالة من برنامج Antida الخاص بشركتنا .


سكرتير خاص شكرا على المعلومات حول الأخطاء المطبعية وعدم الدقة لمستخدمي eirnym ، kurb ، rasswet

Source: https://habr.com/ru/post/ar453348/


All Articles