Memperkenalkan Aliran Udara untuk mengelola Spark Jobs di ivi: harapan dan tongkat penyangga

Tugas menyebarkan model pembelajaran mesin dalam produksi selalu menyakitkan dan menderita, karena sangat tidak nyaman untuk keluar dari notebook jupyter yang nyaman ke dunia pemantauan dan toleransi kesalahan.

Kami sudah menulis tentang iterasi pertama refactoring sistem rekomendasi ivi bioskop online. Selama setahun terakhir, kami hampir tidak menyelesaikan arsitektur aplikasi (dari global - hanya bergerak dari python usang 2.7 dan python 3.4 ke python 3.6 segar), tetapi kami menambahkan beberapa model ML baru dan segera mengalami masalah meluncurkan algoritma baru dalam produksi. Dalam artikel tersebut, saya akan berbicara tentang pengalaman kami dalam mengimplementasikan alat manajemen aliran tugas seperti Apache Airflow: mengapa tim memiliki kebutuhan ini, apa yang tidak sesuai dengan solusi yang ada, kruk mana yang harus dipotong sepanjang jalan dan apa yang terjadi.

→ Versi video dari laporan dapat ditonton di YouTube (mulai pukul 03:00:00) di sini .




Tim Hydra


Saya akan memberi tahu Anda sedikit tentang proyek ini: ivi adalah beberapa puluh ribu unit konten, kami memiliki salah satu direktori hukum terbesar di RuNet. Halaman utama dari versi web ivi adalah potongan yang dipersonalisasi dari katalog, yang dirancang untuk menyediakan pengguna dengan konten yang paling kaya, paling relevan berdasarkan umpan baliknya (pandangan, peringkat, dan sebagainya).


Bagian online dari sistem rekomendasi adalah aplikasi backend Flask dengan beban hingga 600 RPS. Offline, model ini dilatih pada lebih dari 250 juta tampilan konten per bulan. Pipa persiapan data untuk pelatihan diimplementasikan pada Spark, yang berjalan di atas repositori Hive.

Tim sekarang memiliki 7 pengembang yang terlibat dalam pembuatan model dan meluncurkannya ke dalam produksi - ini adalah tim yang agak besar yang membutuhkan alat yang mudah untuk mengelola aliran tugas.

Arsitektur Offline


Di bawah ini Anda melihat diagram infrastruktur aliran data untuk sistem pemberi rekomendasi.


Dua penyimpanan data digambarkan di sini - Hive untuk umpan balik pengguna (tampilan, peringkat) dan Postgres untuk berbagai informasi bisnis (jenis monetisasi konten, dll.), Sementara transfer dari Postgres ke Hive disesuaikan. Satu pak aplikasi Spark menghisap data dari Hive: dan melatih model kami pada data ini (ALS untuk rekomendasi pribadi, berbagai model kolaboratif kesamaan konten).

Aplikasi Spark secara tradisional telah dikelola dari mesin virtual khusus, yang kami sebut hydra-updater menggunakan sekelompok skrip shell + cron. Bundel ini dibuat di departemen operasi ivi pada jaman dahulu dan bekerja dengan sangat baik. Shell-script adalah titik masuk tunggal untuk meluncurkan aplikasi-percikan - yaitu, setiap model baru mulai berputar di prod hanya setelah administrator menyelesaikan skrip ini.

Beberapa artefak pelatihan model disimpan dalam HDFS untuk penyimpanan abadi (dan menunggu seseorang untuk mengunduhnya dari sana dan transfer ke server tempat bagian online berputar), dan beberapa ditulis langsung dari driver Spark ke penyimpanan cepat Redis, yang kami gunakan sebagai umum memori untuk beberapa lusin proses python dari bagian online.

Arsitektur semacam itu telah mengakumulasi sejumlah kerugian dari waktu ke waktu:


Diagram menunjukkan bahwa aliran data memiliki struktur yang agak rumit dan rumit - tanpa alat yang sederhana dan jelas untuk mengelola barang ini, pengembangan dan operasi akan berubah menjadi horor, pembusukan, dan penderitaan.

Selain mengelola aplikasi percikan, skrip admin melakukan banyak hal berguna: memulai kembali layanan dalam pertempuran, dump Redis, dan hal-hal sistem lainnya. Jelas, selama periode operasi yang panjang, skrip telah ditumbuhi banyak fungsi, karena setiap model baru kita menghasilkan beberapa lusin baris di dalamnya. Script mulai terlihat terlalu kelebihan dalam hal fungsionalitas, oleh karena itu, sebagai tim dari sistem yang merekomendasikan, kami ingin mengambil suatu bagian dari fungsi yang menyangkut peluncuran dan pengelolaan aplikasi Spark. Untuk tujuan ini, kami memutuskan untuk menggunakan Airflow.

Kruk untuk Aliran Udara


Selain menyelesaikan semua masalah ini, tentu saja, dalam perjalanan kami membuat yang baru untuk diri kami sendiri - menyebarkan Airflow untuk meluncurkan dan memantau aplikasi Spark ternyata sulit.

Kesulitan utama adalah bahwa tidak ada yang akan merombak seluruh infrastruktur untuk kita, karena sumber daya devops adalah hal yang langka. Untuk alasan ini, kami tidak hanya harus mengimplementasikan Airflow, tetapi mengintegrasikannya ke dalam sistem yang ada, yang jauh lebih sulit dilihat dari awal.

Saya ingin berbicara tentang rasa sakit yang kami temui selama proses implementasi, dan kruk yang harus kami tebas untuk mendapatkan Airflow.

Rasa sakit pertama dan utama : bagaimana mengintegrasikan Airflow ke dalam skrip shell besar dari departemen operasi.

Di sini solusinya adalah yang paling jelas - kami mulai memicu grafik langsung dari skrip shell menggunakan biner aliran udara dengan kunci trigger_dag. Dengan pendekatan ini, kami tidak menggunakan penjadwal Airflow, dan pada kenyataannya aplikasi Spark diluncurkan dengan mahkota yang sama - ini secara agama tidak terlalu benar. Tetapi kami mendapat integrasi yang mulus dengan solusi yang ada. Beginilah awalnya dari skrip shell aplikasi Spark utama kami, yang secara historis disebut hydramatrices.

log "$FUNCNAME started" local RETVAL=0 export AIRFLOW_CONFIG=/opt/airflow/airflow.cfg AIRFLOW_API=api/dag_last_run/hydramatrices/all log "run /var/www/airflow/bin/airflow trigger_dag hydramatrices" /var/www/airflow/bin/airflow trigger_dag hydramatrices 2>&1 | tee -a $LOGFILE 

Nyeri: Skrip shell dari departemen operasi entah bagaimana harus menentukan status grafik Aliran Udara untuk mengontrol aliran eksekusi sendiri.

Crutch: kami memperluas AirEST REST API dengan titik akhir untuk pemantauan DAG tepat di dalam skrip shell. Sekarang setiap grafik memiliki tiga status: RUNNING, SUCCEED, FAILED.

Faktanya, setelah memulai perhitungan di Airflow, kami cukup melakukan polling secara teratur pada grafik yang sedang berjalan: kami menerima permintaan GET untuk menentukan apakah DAG telah selesai atau belum. Ketika titik akhir pemantauan merespons tentang keberhasilan eksekusi grafik, skrip shell terus menjalankan alirannya.
Saya ingin mengatakan bahwa REST API Aliran Udara hanyalah hal yang berapi-api yang memungkinkan Anda mengkonfigurasi secara fleksibel saluran pipa Anda - misalnya, Anda dapat meneruskan parameter POST ke grafik.

Ekstensi Airflow API hanyalah kelas Python yang terlihat seperti ini:

 import json import os from airflow import settings from airflow.models import DagBag, DagRun from flask import Blueprint, request, Response airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api') AIRFLOW_DAGS = '{}/dags'.format( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) class ApiResponse: """    GET """ STATUS_OK = 200 STATUS_NOT_FOUND = 404 def __init__(self): pass @staticmethod def standard_response(status: int, payload: dict) -> Response: json_data = json.dumps(payload) resp = Response(json_data, status=status, mimetype='application/json') return resp def success(self, payload: dict) -> Response: return self.standard_response(self.STATUS_OK, payload) def error(self, status: int, message: str) -> Response: return self.standard_response(status, {'error': message}) def not_found(self, message: str = 'Resource not found') -> Response: return self.error(self.STATUS_NOT_FOUND, message) 

Kami menggunakan API dalam skrip shell - kami menyurvei titik akhir setiap 10 menit:

  TRIGGER=$? [ "$TRIGGER" -eq "0" ] && log "trigger airflow DAG succeeded" || { log "trigger airflow DAG failed"; return 1; } CMD="curl -s http://$HYDRA_SERVER/$AIRFLOW_API | jq .dag_last_run.state" STATE=$(eval $CMD) while [ $STATE == \"running\" ]; do log "Generating matrices in progress..." sleep 600 STATE=$(eval $CMD) done [ $STATE == \"success\" ] && RETVAL=0 || RETVAL=1 [ $RETVAL -eq 0 ] && log "$FUNCNAME succeeded" || log "$FUNCNAME failed" return $RETVAL 

Nyeri : jika Anda pernah menjalankan pekerjaan Spark menggunakan percikan-kirim dalam mode kluster, maka Anda tahu bahwa log di STDOUT adalah lembar tidak informasi dengan baris "SPARK APPLICATION_ID IS RUNNING". Log aplikasi Spark itu sendiri dapat dilihat, misalnya, menggunakan perintah log benang. Dalam skrip shell, masalah ini diselesaikan secara sederhana: terowongan SSH dibuka ke salah satu mesin cluster dan percikan-kirim dieksekusi dalam mode klien untuk mesin ini. Dalam hal ini, STDOUT akan memiliki log yang dapat dibaca dan dimengerti. Di Airflow, kami memutuskan untuk selalu menggunakan cluster-putuskan, dan nomor seperti itu tidak akan berfungsi.

Crutch: setelah spark-submit bekerja, kami menarik log driver dari HDFS oleh application_id dan menampilkannya di antarmuka Airflow hanya melalui operator Python print (). Satu-satunya negatif - di antarmuka Airflow, log muncul hanya setelah percikan-kirim telah berfungsi, Anda harus mengikuti realtime di tempat lain - misalnya, moncong web YARN.

 def get_logs(config: BaseConfig, app_id: str) -> None: """   :param config: :param app_id: """ hdfs = HDFSInteractor(config) logs_path = '/tmp/logs/{username}/logs/{app_id}'.format(username=config.CURRENT_USERNAME, app_id=app_id) logs_files = hdfs.files_in_folder(logs_path) logs_files = [file for file in logs_files if file[-4:] != '.tmp'] for file in logs_files: with hdfs.hdfs_client.read(os.path.join(logs_path, file), encoding='utf-8', delimiter='\n') as reader: print_line = False for line in reader: if re.search('stdout', line) and len(line) > 30: print_line = True if re.search('stderr', line): print_line = False if print_line: print(line) 

Nyeri : untuk penguji dan pengembang, alangkah baiknya memiliki bangku tes Airflow, tetapi kami menghemat sumber daya devops, jadi kami telah memikirkan tentang bagaimana menerapkan lingkungan pengujian untuk waktu yang lama.

Crutch: kami mengemas Airflow dalam wadah buruh pelabuhan, dan Dockerfile meletakkannya di repositori dengan pekerjaan percikan. Dengan demikian, setiap pengembang atau penguji dapat meningkatkan Aliran Udara mereka sendiri pada mesin lokal. Karena kenyataan bahwa aplikasi berjalan dalam mode cluster, sumber daya lokal untuk buruh pelabuhan hampir tidak diperlukan.

Instalasi lokal percikan disembunyikan di dalam wadah buruh pelabuhan dan seluruh konfigurasinya melalui variabel lingkungan - Anda tidak perlu lagi menghabiskan beberapa jam menyiapkan lingkungan. Di bawah ini saya memberikan contoh dengan fragmen file buruh pelabuhan untuk sebuah wadah dengan Airflow, di mana Anda dapat melihat bagaimana Airflow dikonfigurasi menggunakan variabel lingkungan:

 FROM ubuntu:16.04 ARG AIRFLOW_VERSION=1.9.0 ARG AIRFLOW_HOME ARG USERNAME=airflow ARG USER_ID ARG GROUP_ID ARG LOCALHOST ARG AIRFLOW_PORT ARG PIPENV_PATH ARG PROJECT_HYDRAMATRICES_DOCKER_PATH RUN apt-get update \ && apt-get install -y \ python3.6 \ python3.6-dev \ && update-alternatives --install /usr/bin/python3 python3.6 /usr/bin/python3.6 0 \ && apt-get -y install python3-pip RUN mv /root/.pydistutils.cf /root/.pydistutils.cfg RUN pip3 install pandas==0.20.3 \ apache-airflow==$AIRFLOW_VERSION \ psycopg2==2.7.5 \ ldap3==2.5.1 \ cryptography #   ,       ENV PROJECT_HYDRAMATRICES_DOCKER_PATH=${PROJECT_HYDRAMATRICES_DOCKER_PATH} ENV PIPENV_PATH=${PIPENV_PATH} ENV SPARK_HOME=/usr/lib/spark2 ENV HADOOP_CONF_DIR=$PROJECT_HYDRAMATRICES_DOCKER_PATH/etc/hadoop-conf-preprod ENV PYTHONPATH=${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib ENV PIP_NO_BINARY=numpy ENV AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW_DAGS=${AIRFLOW_HOME}/dags ENV AIRFLOW_LOGS=${AIRFLOW_HOME}/logs ENV AIRFLOW_PLUGINS=${AIRFLOW_HOME}/plugins #      Airflow (log url) BASE_URL="http://${AIRFLOW_CURRENT_HOST}:${AIRFLOW_PORT}" ; #   Airflow ENV AIRFLOW__WEBSERVER__BASE_URL=${BASE_URL} ENV AIRFLOW__WEBSERVER__ENDPOINT_URL=${BASE_URL} ENV AIRFLOW__CORE__AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW__CORE__DAGS_FOLDER=${AIRFLOW_DAGS} ENV AIRFLOW__CORE__BASE_LOG_FOLDER=${AIRFLOW_LOGS} ENV AIRFLOW__CORE__PLUGINS_FOLDER=${AIRFLOW_PLUGINS} ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=${AIRFLOW_LOGS}/scheduler 

Sebagai hasil dari penerapan Aliran Udara, kami mencapai hasil berikut:

  • Mengurangi siklus rilis: meluncurkan model baru (atau pipa persiapan data) sekarang turun untuk menulis grafik Airflow baru, grafik itu sendiri disimpan dalam repositori dan disebarkan dengan kode. Proses ini sepenuhnya berada di tangan pengembang. Admin senang, kami tidak lagi menarik mereka pada hal sepele.
  • Log aplikasi Spark yang dulu langsung menuju neraka sekarang disimpan di Aiflow dengan antarmuka akses yang nyaman. Anda dapat melihat log untuk hari apa pun tanpa memilih di direktori HDFS.
  • Perhitungan yang gagal dapat dimulai kembali dengan satu tombol di antarmuka, itu sangat nyaman, bahkan Juni dapat menanganinya.
  • Anda dapat menjalankan tugas percikan dari antarmuka tanpa harus mengalami pengaturan Spark pada mesin lokal. Penguji senang - semua pengaturan agar spark-submit berfungsi dengan benar sudah dibuat di Dockerfile
  • Aiflow roti standar - jadwal, memulai kembali pekerjaan yang jatuh, grafik yang indah (misalnya, waktu eksekusi aplikasi, statistik peluncuran yang berhasil dan tidak berhasil).

Ke mana harus pergi selanjutnya? Sekarang kita memiliki sejumlah besar sumber data dan sink, yang jumlahnya akan bertambah. Perubahan dalam kelas repositori hydramatrices dapat macet di pipa lain (atau bahkan di bagian online):

  • Clickhouse meluap → Sarang
  • preprocessing data: Sarang → Sarang
  • menggunakan model c2c: Hive → Redis
  • persiapan direktori (seperti jenis monetisasi konten): Postgres → Redis
  • persiapan model: FS Lokal → HDFS

Dalam situasi seperti itu, kita benar-benar memerlukan dukungan pengujian otomatis pipa dalam persiapan data. Ini akan sangat mengurangi biaya pengujian perubahan dalam repositori, mempercepat peluncuran model-model baru dalam produksi dan secara dramatis meningkatkan tingkat endorfin pada penguji. Tetapi tanpa Airflow, tidak mungkin untuk memasang dudukan untuk pengujian otomatis semacam ini!

Saya menulis artikel ini untuk menceritakan pengalaman kami dalam mengimplementasikan Airflow, yang mungkin berguna bagi tim lain dalam situasi yang sama - Anda sudah memiliki sistem kerja yang besar, dan Anda ingin mencoba sesuatu yang baru, modis dan awet muda. Tidak perlu takut dengan pembaruan pada sistem kerja, Anda perlu mencoba dan bereksperimen - eksperimen semacam itu biasanya membuka cakrawala baru untuk pengembangan lebih lanjut.

Source: https://habr.com/ru/post/id456630/


All Articles