مهمة نشر نماذج التعلم الآلي في الإنتاج هي دائمًا الألم والمعاناة ، لأنه من غير المريح جدًا الخروج من دفتر ملاحظات مريح في عالم المراقبة والتسامح مع الأخطاء.
لقد كتبنا بالفعل عن
التكرار الأول لإعادة بناء نظام التوصية للسينما الإلكترونية ivi. على مدار العام الماضي ، لم ننتهي من وضع بنية التطبيق تقريبًا (من العالمية - فقط من الانتقال من python 2.7 و python 3.4 إلى python 3.6) ، لكننا أضفنا بعض طرز ML الجديدة وواجهنا على الفور مشكلة طرح خوارزميات جديدة في الإنتاج. في المقال ، سوف أخبرنا عن تجربتنا في تنفيذ أداة إدارة تدفق المهام مثل Apache Airflow: لماذا كان لدى الفريق هذه الحاجة ، وما الذي لا يتناسب مع الحل الحالي ، الذي يجب قطع العكازات على طول الطريق وماذا جاء منه.
→ يمكن مشاهدة نسخة الفيديو من التقرير على YouTube (بدءًا من الساعة 03:00:00)
هنا .
فريق هيدرا
سوف أخبركم قليلاً عن المشروع: ivi عبارة عن عشرات الآلاف من وحدات المحتوى ، ولدينا واحدة من أكبر الأدلة القانونية في RuNet. الصفحة الرئيسية لإصدار الويب ivi عبارة عن مقطع مخصص من النشرة المصورة ، وهو مصمم لتزويد المستخدم بالمحتوى الأكثر ثراءً والأكثر صلة استنادًا إلى ملاحظاته (المشاهدات ، التصنيفات ، وما إلى ذلك).
الجزء عبر الإنترنت من نظام التوصية هو تطبيق Flask backend مع حمولة تصل إلى 600 RPS. في وضع عدم الاتصال ، يتم تدريب النموذج على أكثر من 250 مليون مشاهدة محتوى شهريًا. يتم تنفيذ خطوط أنابيب إعداد البيانات للتدريب على Spark ، والتي تعمل على أعلى مستودع Hive.
لدى الفريق الآن 7 مطورين يشاركون في إنشاء النماذج وطرحها في الإنتاج - هذا فريق كبير إلى حد ما يتطلب أدوات ملائمة لإدارة تدفقات المهام.
الهندسة المعمارية حاليا
ترى أدناه مخطط البنية التحتية لتدفق البيانات لنظام التوصية.
يتم عرض مخزنين للبيانات هنا - Hive لملاحظات المستخدم (المشاهدات والتقييمات) و Postgres لمعلومات العمل المختلفة (أنواع تسييل المحتوى ، إلخ) ، بينما يتم ضبط النقل من Postgres إلى Hive. تمتص حزمة من تطبيقات Spark البيانات من Hive: وتدرب نماذجنا على هذه البيانات (ALS للتوصيات الشخصية ، والنماذج التعاونية المختلفة لتشابه المحتوى).
تتم إدارة تطبيقات Spark تقليديًا من جهاز افتراضي مخصص ، والذي نسميه hydra-updater باستخدام مجموعة من البرامج النصية cron + shell. تم إنشاء هذه الحزمة في قسم عمليات IVI في زمن سحيق وعملت بشكل رائع. كان Shell-script نقطة دخول واحدة لإطلاق تطبيقات الشرارة - أي ، بدأ كل طراز جديد في الدوران في المنتج فقط بعد أن ينهي المسؤولون هذا البرنامج النصي.
يتم تخزين بعض القطع الأثرية للتدريب النموذجي في HDFS للتخزين الأبدي (وينتظر قيام شخص ما بتنزيلها من هناك ونقلها إلى الخادم حيث يدور الجزء عبر الإنترنت) ، وبعضها مكتوب مباشرةً من برنامج التشغيل Spark إلى التخزين السريع Redis ، والذي نستخدمه بشكل عام الذاكرة لعدة عشرات عمليات الثعبان من الجزء على الانترنت.
تراكمت مثل هذه البنية عددًا من العيوب بمرور الوقت:
يوضح الرسم البياني أن تدفق البيانات له بنية معقدة ومعقدة إلى حد ما - وبدون وجود أداة بسيطة وواضحة لإدارة هذا الصالح ، سيتحول التطوير والتشغيل إلى رعب وانحطاط ومعاناة.
بالإضافة إلى إدارة تطبيقات الشرارة ، يقوم البرنامج النصي للمشرف بالكثير من الأشياء المفيدة: إعادة تشغيل الخدمات في المعركة وتفريغ Redis وأشياء أخرى في النظام. من الواضح ، على مدى فترة طويلة من التشغيل ، امتلأ البرنامج النصي بالعديد من الوظائف ، حيث أن كل نموذج جديد خاص بنا قد أنتج بضع عشرات من الأسطر. بدأ البرنامج النصي يبدو مثقلًا جدًا من حيث الوظيفة ، لذلك ، كفريق من نظام الموصي ، أردنا إزالة جزء من الوظيفة في مكان ما يتعلق ببدء تشغيل تطبيقات Spark وإدارتها. لهذه الأغراض ، قررنا استخدام Airflow.
العكازات لتدفق الهواء
بالإضافة إلى حل جميع هذه المشاكل ، بالطبع ، على الطريقة التي أنشأنا بها مشاكل جديدة لأنفسنا - نشر Airflow لتشغيل تطبيقات Spark ومراقبتها ، أصبح أمرًا صعبًا.
كانت الصعوبة الرئيسية هي أن لا أحد سيعيد تشكيل البنية التحتية بأكملها بالنسبة لنا ، لأن الموارد devops هو شيء نادر. لهذا السبب ، لم يكن علينا فقط تطبيق Airflow ، ولكن دمجها في النظام الحالي ، وهو أمر يصعب رؤيته من البداية.
أريد أن أتحدث عن الآلام التي واجهناها أثناء عملية التنفيذ ، والعكازات التي اضطررنا لإزالتها من أجل الحصول على Airflow.
الألم الأول والرئيسي : كيفية دمج Airflow في برنامج نصي كبير لقسم العمليات.
الحل هنا هو الأكثر وضوحا - بدأنا في تشغيل الرسوم البيانية مباشرة من البرنامج النصي shell باستخدام تدفق الهواء الثنائي مع مفتاح trigger_dag. مع هذا النهج ، نحن لا نستخدم sheduler Airflow ، وفي الواقع يتم تشغيل تطبيق Spark بنفس التاج - وهذا غير صحيح دينياً. لكن حصلنا على تكامل سلس مع حل موجود. إليك الشكل الذي تبدو عليه البداية من خلال برنامج shell الخاص بتطبيق Spark الرئيسي ، والذي يسمى تاريخياً hydramatrices.
log "$FUNCNAME started" local RETVAL=0 export AIRFLOW_CONFIG=/opt/airflow/airflow.cfg AIRFLOW_API=api/dag_last_run/hydramatrices/all log "run /var/www/airflow/bin/airflow trigger_dag hydramatrices" /var/www/airflow/bin/airflow trigger_dag hydramatrices 2>&1 | tee -a $LOGFILE
الألم: يجب أن يحدد البرنامج النصي shell لإدارة العمليات بطريقة ما حالة رسم Airflow من أجل التحكم في تدفق التنفيذ الخاص به.
عكاز: قمنا بتمديد واجهة برمجة تطبيقات Airflow REST بنقطة نهاية لمراقبة DAG داخل نصوص الصدمات مباشرة. الآن يحتوي كل رسم بياني على ثلاث حالات: التشغيل ، والنجاح ، والفشل.
في الواقع ، بعد بدء العمليات الحسابية في Airflow ، نقوم ببساطة باستطلاع الرسم البياني الجاري: لقد حددنا طلب GET لتحديد ما إذا كانت DAG قد أكملت أم لا. عندما تستجيب نقطة النهاية للرصد حول التنفيذ الناجح للرسم البياني ، يستمر البرنامج النصي shell في تنفيذ التدفق.
أريد أن أقول إن واجهة برمجة تطبيقات Airflow REST هي مجرد شيء ملتهب يسمح لك بتكوين خطوط الأنابيب بمرونة - على سبيل المثال ، يمكنك إعادة توجيه معلمات POST إلى الرسوم البيانية.
امتداد Airflow API هو مجرد فئة بيثون تبدو مثل هذا:
import json import os from airflow import settings from airflow.models import DagBag, DagRun from flask import Blueprint, request, Response airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api') AIRFLOW_DAGS = '{}/dags'.format( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) class ApiResponse: """ GET """ STATUS_OK = 200 STATUS_NOT_FOUND = 404 def __init__(self): pass @staticmethod def standard_response(status: int, payload: dict) -> Response: json_data = json.dumps(payload) resp = Response(json_data, status=status, mimetype='application/json') return resp def success(self, payload: dict) -> Response: return self.standard_response(self.STATUS_OK, payload) def error(self, status: int, message: str) -> Response: return self.standard_response(status, {'error': message}) def not_found(self, message: str = 'Resource not found') -> Response: return self.error(self.STATUS_NOT_FOUND, message)
نستخدم واجهة برمجة التطبيقات (API) في البرنامج النصي shell - نقوم باستطلاع نقطة النهاية كل 10 دقائق:
TRIGGER=$? [ "$TRIGGER" -eq "0" ] && log "trigger airflow DAG succeeded" || { log "trigger airflow DAG failed"; return 1; } CMD="curl -s http://$HYDRA_SERVER/$AIRFLOW_API | jq .dag_last_run.state" STATE=$(eval $CMD) while [ $STATE == \"running\" ]; do log "Generating matrices in progress..." sleep 600 STATE=$(eval $CMD) done [ $STATE == \"success\" ] && RETVAL=0 || RETVAL=1 [ $RETVAL -eq 0 ] && log "$FUNCNAME succeeded" || log "$FUNCNAME failed" return $RETVAL
ألم : إذا قمت بتشغيل مهمة Spark باستخدام شرارة إرسال في وضع نظام المجموعة ، فأنت تعلم أن السجلات في STDOUT هي ورقة غير معلوماتية تحتوي على السطور "SPARK APPLICATION_ID IS RUNNING". يمكن الاطلاع على سجلات تطبيق Spark نفسه ، على سبيل المثال ، باستخدام أمر سجلات الغزل. في البرنامج النصي shell ، تم حل هذه المشكلة ببساطة: تم فتح نفق SSH إلى أحد أجهزة الكتلة وتم تنفيذ spark-submit في وضع العميل لهذا الجهاز. في هذه الحالة ، سيكون لدى STDOUT سجلات قابلة للقراءة ومفهومة. في Airflow ، قررنا دائمًا استخدام نظام المجموعة ، ولن يعمل هذا الرقم.
عكاز: بعد نجاح شرارة التقديم ، نقوم بسحب سجلات برنامج التشغيل من HDFS بواسطة application_id ونعرضها في واجهة Airflow ببساطة من خلال مشغل Python print (). السلبية الوحيدة - في واجهة Airflow ، تظهر السجلات فقط بعد أن تعمل شرارة التقديم ، يجب عليك مراقبة الوقت الفعلي في أماكن أخرى - على سبيل المثال ، كمامة الويب YARN.
def get_logs(config: BaseConfig, app_id: str) -> None: """ :param config: :param app_id: """ hdfs = HDFSInteractor(config) logs_path = '/tmp/logs/{username}/logs/{app_id}'.format(username=config.CURRENT_USERNAME, app_id=app_id) logs_files = hdfs.files_in_folder(logs_path) logs_files = [file for file in logs_files if file[-4:] != '.tmp'] for file in logs_files: with hdfs.hdfs_client.read(os.path.join(logs_path, file), encoding='utf-8', delimiter='\n') as reader: print_line = False for line in reader: if re.search('stdout', line) and len(line) > 30: print_line = True if re.search('stderr', line): print_line = False if print_line: print(line)
الألم : بالنسبة للمختبرين والمطورين ، سيكون من الجيد الحصول على مقعد اختبار Airflow ، لكننا نقوم بتوفير موارد devops ، لذلك فكرنا في كيفية نشر بيئة الاختبار لفترة طويلة.
عكاز: لقد قمنا بتعبئة Airflow في حاوية الإرساء ، ووضعها Dockerfile في المستودع بوظائف شرارة. وبالتالي ، يمكن لكل مطور أو اختبار رفع تدفق الهواء الخاص بهم على جهاز محلي. نظرًا لحقيقة أن التطبيقات تعمل في وضع نظام المجموعة ، فإن الموارد المحلية لجهاز الإرساء غير مطلوبة تقريبًا.
تم إخفاء التثبيت المحلي للشرارة داخل حاوية عامل النقل وتكوينها بالكامل عبر متغيرات البيئة - لم تعد بحاجة إلى قضاء عدة ساعات في إعداد البيئة. أدناه ، قدمت مثالًا مع جزء ملف عامل ميناء لحاوية مع Airflow ، حيث يمكنك معرفة كيفية تكوين Airflow باستخدام متغيرات البيئة:
FROM ubuntu:16.04 ARG AIRFLOW_VERSION=1.9.0 ARG AIRFLOW_HOME ARG USERNAME=airflow ARG USER_ID ARG GROUP_ID ARG LOCALHOST ARG AIRFLOW_PORT ARG PIPENV_PATH ARG PROJECT_HYDRAMATRICES_DOCKER_PATH RUN apt-get update \ && apt-get install -y \ python3.6 \ python3.6-dev \ && update-alternatives --install /usr/bin/python3 python3.6 /usr/bin/python3.6 0 \ && apt-get -y install python3-pip RUN mv /root/.pydistutils.cf /root/.pydistutils.cfg RUN pip3 install pandas==0.20.3 \ apache-airflow==$AIRFLOW_VERSION \ psycopg2==2.7.5 \ ldap3==2.5.1 \ cryptography
نتيجة لتنفيذ Airflow ، حققنا النتائج التالية:
- تقليل دورة الإصدار: طرح نموذج جديد (أو خط أنابيب لإعداد البيانات) الآن لكتابة رسم بياني جديد لـ Airflow ، يتم تخزين الرسوم البيانية نفسها في المستودع ونشرها مع الكود. هذه العملية هي تماما في أيدي المطور. المسؤولون سعداء ، لم نعد نسحبهم على تفاهات.
- يتم الآن تخزين سجلات تطبيق Spark التي اعتادت الانتقال مباشرة إلى الجحيم في Aiflow مع واجهة وصول مريحة. يمكنك رؤية السجلات لأي يوم دون اختيار أدلة HDFS.
- يمكن إعادة تشغيل الحساب الفاشل باستخدام زر واحد في الواجهة ، وهو مريح للغاية ، وحتى يونيو يمكن معالجته.
- يمكنك نشر وظائف الشرارة من الواجهة دون الحاجة إلى تشغيل إعدادات Spark على الجهاز المحلي. المختبرون سعداء - جميع إعدادات شرارة التقديم للعمل بشكل صحيح مصنوعة بالفعل في Dockerfile
- الكعك القياسية Aiflow - الجداول الزمنية ، إعادة تشغيل الوظائف الساقطة ، الرسوم البيانية الجميلة (على سبيل المثال ، وقت تنفيذ التطبيق ، إحصائيات عمليات الإطلاق الناجحة وغير الناجحة)
إلى أين تذهب بعد ذلك؟ الآن لدينا عدد كبير من مصادر البيانات والمصارف ، والتي سوف ينمو عددها. يمكن أن تتعطل التغييرات في أي فئة من مستودعات تخزين المواد المائية في خط أنابيب آخر (أو حتى في الجزء المتصل بالإنترنت):
- الفيضانات Clickhouse → خلية
- معالجة البيانات الأولية: Hive → Hive
- نشر نماذج c2c: Hive → Redis
- إعداد الدلائل (مثل نوع تسييل المحتوى): Postgres → Redis
- إعداد النموذج: محلي FS → HDFS
في مثل هذه الحالة ، نحتاج حقًا إلى موقف للاختبار التلقائي لخطوط الأنابيب في إعداد البيانات. سيقلل هذا إلى حد كبير تكلفة اختبار التغييرات في المستودع ، ويسرع في طرح الطرز الجديدة في الإنتاج ويزيد بشكل كبير من مستوى الإندورفين في الفاحصات. ولكن بدون Airflow ، سيكون من المستحيل نشر موقف لهذا النوع من الاختبار التلقائي!
كتبت هذا المقال لأتحدث عن تجربتنا في تطبيق Airflow ، والتي قد تكون مفيدة للفرق الأخرى في وضع مماثل - لديك بالفعل نظام عمل كبير ، وتريد تجربة شيء جديد وعصري وشبابي. لا داعي للخوف من أي تحديثات لنظام العمل ، فأنت بحاجة إلى المحاولة والتجربة - مثل هذه التجارب عادةً ما تفتح آفاقًا جديدة لمزيد من التطوير.