كيفية جعل autoscaler الخاص بك لمجموعة

تحية! نحن ندرب الناس على العمل مع البيانات الضخمة. من المستحيل تخيل برنامج تعليمي على البيانات الضخمة بدون مجموعته الخاصة ، والتي يعمل عليها جميع المشاركين معًا. لهذا السبب ، لدينا دائمًا برنامجنا :) نحن منخرطون في توليفه وتوليفه وإدارته ، ويقوم الشباب مباشرة بتشغيل وظائف MapReduce هناك واستخدام Spark.


في هذا المنشور ، سنصف كيف قمنا بحل مشكلة التحميل غير المتكافئ للكتلة عن طريق كتابة autoscaler لدينا باستخدام سحابة Mail.ru Cloud Solutions .


المشكلة


المجموعة التي نستخدمها ليست نموذجية تماما. التخلص غير متكافئ للغاية. على سبيل المثال ، هناك تمارين عملية عندما يدخل جميع الأشخاص البالغ عددهم 30 شخصًا المجموعة وتبدأ استخدامها. أو مرة أخرى ، هناك أيام قبل الموعد النهائي ، عندما يزيد الحمل بشكل كبير. بقية الوقت ، تعمل المجموعة في وضع التحميل الزائد.


الحل # 1 هو الحفاظ على كتلة يمكنها تحمل أحمال الذروة ، لكنها ستبقى خاملاً في الوقت الحالي.


الحل رقم 2 هو الحفاظ على مجموعة صغيرة لإضافة العقد يدوياً قبل الفئات وأثناء تحميل الذروة.


الحل # 3 هو الحفاظ على مجموعة صغيرة وكتابة autoscaler التي ستراقب الحمل الحالي للكتلة ، وباستخدام واجهات برمجة التطبيقات المختلفة ، قم بإضافة وإزالة العقد من المجموعة.


في هذا المنشور سنتحدث عن القرار رقم 3. يعتمد هذا autoscaler اعتمادًا كبيرًا على العوامل الخارجية ، وليس على العوامل الداخلية ، وغالبًا ما لا يوفرها المزوّدون. نحن نستخدم البنية التحتية السحابية Mail.ru Cloud Solutions وقد كتبنا autoscaler باستخدام MCS API. ونظرًا لأننا نتدرب على التعامل مع البيانات ، فقد قررنا أن نوضح كيف يمكنك كتابة autoscaler مماثلة لأغراضك واستخدامها مع السحابة الخاصة بك


المتطلبات الأساسية


أولاً ، يجب أن يكون لديك كتلة Hadoop. على سبيل المثال ، نستخدم توزيع HDP.


بحيث يمكن إضافة العقد وإزالتها بسرعة ، يجب أن يكون لديك توزيع معين للأدوار بين العقد.


  1. عقدة رئيسية. حسنًا ، لا يوجد شيء ضروري بشكل خاص للتوضيح هنا: العقدة الرئيسية للمجموعة ، والتي ، على سبيل المثال ، يتم تشغيل برنامج تشغيل Spark ، إذا كنت تستخدم الوضع التفاعلي.
  2. عقدة التاريخ. هذه هي العقدة التي تقوم بتخزين البيانات عليها على HDFS ويتم إجراء العمليات الحسابية عليها.
  3. عقدة الحوسبة. هذه عقدة لا تخزن عليها أي شيء على HDFS ، ولكن يتم إجراء العمليات الحسابية عليها.

نقطة مهمة. سوف يحدث الفحص الذاتي بسبب النوع الثالث من العقد. إذا بدأت في التقاط وإضافة العقد من النوع الثاني ، فستكون سرعة الاستجابة منخفضة جدًا - يتم فك ضغطها وسيستغرق الأمر ساعات في مجموعتك. هذا ، بالطبع ، ليس ما تتوقعه من عملية الاستكمال التلقائي. أي أننا لا نلمس العقد من النوع الأول والثاني. ستكون مجموعة قابلة للحياة إلى الحد الأدنى وستكون موجودة في جميع أنحاء البرنامج.


لذلك ، يتم كتابة برنامج الإحضار التلقائي الخاص بنا في Python 3 ، ويستخدم API Ambari لإدارة خدمات الكتلة ، ويستخدم Mail.ru Cloud Solutions (MCS) API لتشغيل الأجهزة وإيقافها.


هندسة الحلول


  1. وحدة autoscaler.py . يتم تسجيل ثلاث فئات فيه: 1) وظائف للعمل مع Ambari ، 2) وظائف للعمل مع MCS ، 3) وظائف مرتبطة مباشرة بمنطق autoscaler.
  2. النصي observer.py . في الواقع ، يتكون من قواعد مختلفة: متى وفي أي لحظات استدعاء وظائف autoscaler.
  3. الملف مع معلمات التكوين config.py . يحتوي ، على سبيل المثال ، على قائمة بالعقد المسموح بها لإجراء التعيين الذاتي وغيرها من المعلمات التي تؤثر ، على سبيل المثال ، على مقدار الوقت الذي يجب الانتظار من لحظة إضافة عقدة جديدة. هناك أيضًا طوابع زمنية لبداية الفصول الدراسية ، بحيث يتم قبل بدء الجلسة تحديد الحد الأقصى لتكوين الكتلة المسموح به.

دعونا نلقي نظرة على أجزاء الكود داخل أول ملفين.


1. وحدة autoscaler.py


سفاري الطبقة


هذا هو جزء التعليمات البرمجية الذي يحتوي على فئة Ambari :


 class Ambari: def __init__(self, ambari_url, cluster_name, headers, auth): self.ambari_url = ambari_url self.cluster_name = cluster_name self.headers = headers self.auth = auth def stop_all_services(self, hostname): url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/' url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname req0 = requests.get(url2, headers=self.headers, auth=self.auth) services = req0.json()['host_components'] services_list = list(map(lambda x: x['HostRoles']['component_name'], services)) data = { "RequestInfo": { "context":"Stop All Host Components", "operation_level": { "level":"HOST", "cluster_name": self.cluster_name, "host_names": hostname }, "query":"HostRoles/component_name.in({0})".format(",".join(services_list)) }, "Body": { "HostRoles": { "state":"INSTALLED" } } } req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth) if req.status_code in [200, 201, 202]: message = 'Request accepted' else: message = req.status_code return message 

على سبيل المثال أعلاه ، يمكنك إلقاء نظرة على تطبيق وظيفة stop_all_services ، التي توقف جميع الخدمات على عقدة نظام المجموعة المرغوب.


إلى فئة Ambari تمر:


  • ambari_url ، على سبيل المثال ، للنموذج 'http://localhost:8080/api/v1/clusters/' ،
  • cluster_name هو اسم المجموعة الخاصة بك في Ambari ،
  • headers = {'X-Requested-By': 'ambari'}
  • ويوجد داخل auth اسم المستخدم وكلمة المرور الخاصين بك من Ambari: auth = ('login', 'password') .

الوظيفة نفسها ليست أكثر من مكالمات عبر API REST إلى Ambari. من وجهة نظر المنطق ، نحصل أولاً على قائمة بالخدمات قيد التشغيل على العقدة ، ثم نطلب من هذه المجموعة ، على هذه العقدة ، نقل الخدمات من القائمة إلى حالة INSTALLED . تبدو وظائف تشغيل جميع الخدمات ، ووضع العقد في حالة Maintenance ، وما شابه ذلك - فهي مجرد طلبات قليلة من خلال واجهة برمجة التطبيقات.


فئة mcs


هذا هو جزء التعليمات البرمجية الذي يحتوي على فئة Mcs :


 class Mcs: def __init__(self, id1, id2, password): self.id1 = id1 self.id2 = id2 self.password = password self.mcs_host = 'https://infra.mail.ru:8774/v2.1' def vm_turn_on(self, hostname): self.token = self.get_mcs_token() host = self.hostname_to_vmname(hostname) vm_id = self.get_vm_id(host) mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action' headers = { 'X-Auth-Token': '{0}'.format(self.token), 'Content-Type': 'application/json' } data = {'os-start' : 'null'} mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers) return mcs.status_code 

إلى فئة Mcs ، نقوم بتمرير معرف المشروع داخل السحابة ومعرف المستخدم ، وكذلك كلمة المرور الخاصة به. في وظيفة vm_turn_on نريد تمكين أحد الأجهزة. المنطق هنا هو أكثر تعقيدا قليلا. في بداية الكود ، تسمى ثلاث وظائف أخرى: 1) نحتاج إلى الحصول على الرمز المميز ، 2) نحتاج إلى تحويل اسم المضيف إلى اسم الجهاز في MCS ، 3) الحصول على معرف هذا الجهاز. بعد ذلك ، نقدم طلبًا بسيطًا بعد التشغيل ونقوم بتشغيل هذا الجهاز.


هذه هي الطريقة التي تبدو بها وظيفة تلقي الرمز المميز:


 def get_mcs_token(self): url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog' headers = {'Content-Type': 'application/json'} data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'id': self.id1, 'password': self.password } } }, 'scope': { 'project': { 'id': self.id2 } } } } params = (('nocatalog', ''),) req = requests.post(url, data=json.dumps(data), headers=headers, params=params) self.token = req.headers['X-Subject-Token'] return self.token 

فئة autoscaler


يحتوي هذا الفصل على وظائف متعلقة بمنطق العمل نفسه.


هذه هي الطريقة التي يبدو بها رمز من هذه الفئة:


 class Autoscaler: def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node): self.scaling_hosts = scaling_hosts self.ambari = ambari self.mcs = mcs self.q_ram = deque() self.q_cpu = deque() self.num = 0 self.yarn_ram_per_node = yarn_ram_per_node self.yarn_cpu_per_node = yarn_cpu_per_node def scale_down(self, hostname): flag1 = flag2 = flag3 = flag4 = flag5 = False if hostname in self.scaling_hosts: while True: time.sleep(5) status1 = self.ambari.decommission_nodemanager(hostname) if status1 == 'Request accepted' or status1 == 500: flag1 = True logging.info('Decomission request accepted: {0}'.format(flag1)) break while True: time.sleep(5) status3 = self.ambari.check_service(hostname, 'NODEMANAGER') if status3 == 'INSTALLED': flag3 = True logging.info('Nodemaneger decommissioned: {0}'.format(flag3)) break while True: time.sleep(5) status2 = self.ambari.maintenance_on(hostname) if status2 == 'Request accepted' or status2 == 500: flag2 = True logging.info('Maintenance request accepted: {0}'.format(flag2)) break while True: time.sleep(5) status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER') if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST': flag4 = True self.ambari.stop_all_services(hostname) logging.info('Maintenance is on: {0}'.format(flag4)) logging.info('Stopping services') break time.sleep(90) status5 = self.mcs.vm_turn_off(hostname) while True: time.sleep(5) status5 = self.mcs.get_vm_info(hostname)['server']['status'] if status5 == 'SHUTOFF': flag5 = True logging.info('VM is turned off: {0}'.format(flag5)) break if flag1 and flag2 and flag3 and flag4 and flag5: message = 'Success' logging.info('Scale-down finished') logging.info('Cooldown period has started. Wait for several minutes') return message 

نحن نقبل الفئات Ambari و Mcs ، وقائمة العقد المسموح بها للتحجيم ، ومعلمات التكوين الخاصة بالعُقد: الذاكرة ووحدة المعالجة المركزية المخصصة للعقدة في YARN. هناك أيضا 2 المعلمات الداخلية q_ram ، q_cpu ، والتي هي قوائم الانتظار. باستخدامها ، نقوم بتخزين قيم تحميل الكتلة الحالي. إذا رأينا أنه خلال الـ 5 دقائق الماضية ، كان هناك حمل متزايد ومستقر ، فإننا نقرر أننا بحاجة إلى إضافة عقدة +1 إلى الكتلة. وينطبق الشيء نفسه بالنسبة لحالة underload الكتلة.


يُظهر الرمز أعلاه مثالًا لوظيفة تزيل جهازًا من كتلة وتوقفه في السحابة. أولاً ، YARN Nodemanager ، ثم يتم تشغيل وضع Maintenance ، ثم نوقف جميع الخدمات على الجهاز ونوقف تشغيل الجهاز الظاهري في السحابة.


2. النصي observer.py


نموذج التعليمة البرمجية من هناك:


 if scaler.assert_up(config.scale_up_thresholds) == True: hostname = cloud.get_vm_to_up(config.scaling_hosts) if hostname != None: status1 = scaler.scale_up(hostname) if status1 == 'Success': text = {"text": "{0} has been successfully scaled-up".format(hostname)} post = {"text": "{0}".format(text)} json_data = json.dumps(post) req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) time.sleep(config.cooldown_period*60) 

في ذلك ، نتحقق مما إذا كانت الظروف موجودة لزيادة سعة المجموعة وما إذا كان هناك جهاز في الاحتياط على الإطلاق ، نحصل على اسم المضيف لأحدهم ، ونضيفه إلى المجموعة وننشر رسالة حول هذا في Slack من فريقنا. بعد ذلك ، cooldown_period ، عندما لا نقوم بإضافة أو إزالة أي شيء من الكتلة ، ولكن ببساطة مراقبة الحمل. إذا استقرت ووجدت داخل ممر قيم الحمل المثلى ، فسنستمر في المتابعة. إذا لم تكن العقدة كافية ، فقم بإضافة واحدة أخرى.


بالنسبة للحالات التي يكون فيها درسًا أمامنا ، نعلم بالفعل بالتأكيد أن عقدة واحدة ليست كافية ، لذلك سنبدأ على الفور جميع العقد المجانية ونبقيها نشطة حتى نهاية الدرس. يحدث هذا مع قائمة فئات الطابع الزمني.


استنتاج


يعد autoscaler حلاً جيدًا ومريحًا لتلك الحالات عندما يكون لديك نظام تحميل غير متكافئ. يمكنك في الوقت نفسه تحقيق تكوين الكتلة المطلوب لأحمال الذروة وفي نفس الوقت لا تحتفظ بهذه المجموعة أثناء التحميل الزائد وتوفير المال. حسنًا ، بالإضافة إلى ذلك ، يحدث كل ذلك تلقائيًا دون مشاركتك. جهاز autoscaler نفسه ليس أكثر من مجموعة من الطلبات لواجهة برمجة تطبيقات إدارة نظام المجموعة وواجهة برمجة تطبيقات موفر الخدمة السحابية ، والتي يتم تسجيلها وفقًا لمنطق معين. ما يجب تذكره بالضبط هو فصل العقد إلى ثلاثة أنواع ، كما كتبنا سابقًا. وسوف تكون سعيدا.

Source: https://habr.com/ru/post/ar482198/


All Articles