Halo, Habr!
Bukan rahasia lagi bahwa bank menggunakan data dari berbagai sumber (biro kredit, operator seluler, dll.) Untuk menilai solvabilitas pelanggan. Jumlah mitra eksternal dapat mencapai beberapa lusin, dan analis di tim kami hanya akan merekrut beberapa orang. Masalah muncul dari mengoptimalkan kerja tim kecil dan mentransfer tugas rutin ke sistem komputasi.
Kami akan menganalisis bagaimana data ini masuk ke bank, dan bagaimana tim analis memantau proses ini.

Mari kita mulai.
Sistem terdistribusi kami berdasarkan Hadoop, dan semua proses yang terkait dengannya, kami secara singkat memanggil SmartData. SmartData menerima data API dari agen eksternal. (Selain itu, agen untuk itu adalah mitra eksternal dan sistem internal bank). Tentu saja, akan berguna untuk mengumpulkan "profil saat ini" tertentu untuk setiap klien, yang kami lakukan. Data yang diperbarui dari sumber termasuk dalam Operprofil. Operprofile mengimplementasikan ide Pelanggan 360 dan disimpan sebagai tabel Hbase. Lebih mudah untuk bekerja lebih lanjut dengan klien.
Pelanggan 360Pelanggan 360 - pendekatan untuk menerapkan penyimpanan operasional dengan semua jenis atribut data klien yang digunakan dalam semua proses dalam organisasi yang bekerja dengan klien dan datanya, dapat diakses dengan kunci klien.
Bekerja dengan agen sedang berlangsung dan perlu dikendalikan. Untuk dengan cepat memeriksa kualitas interaksi dan tingkat hit, serta kemudahan mentransfer informasi ini ke tim lain, kami menggunakan visualisasi, misalnya, laporan di Tableau.
Sumber data dikirim ke
Kafka , pra-diproses dan ditempatkan di DataLake yang dibangun berdasarkan
HDFS . Itu perlu untuk datang dengan solusi bagaimana mengatur parsing file log dari HDFS, pemrosesan dan pengunggahan harian ke sistem analitik dan visualisasi. Dan juga kombinasikan ini dengan kecintaan para analis terhadap laptop Python.
Akhiri dengan dapur internal dan lanjutkan berlatih.
Solusi kami adalah menggunakan API Livy. Livy memungkinkan Anda untuk mengirim kode ke sebuah cluster langsung dari Jupyter. Permintaan HTTP yang berisi kode yang ditulis dengan Python (atau Scala) dan meta data dikirim ke Livy. Livy memulai peluncuran sesi Spark di kluster, yang dikelola oleh manajer sumber daya Benang. Modul permintaan cocok untuk mengirim permintaan HTTP. Mereka yang suka mem-parsing situs mungkin sudah mengenalnya (dan jika tidak, inilah kesempatan untuk belajar sedikit tentangnya).
Kami mengimpor modul yang diperlukan dan membuat sesi. (Kami juga segera mengetahui alamat sesi kami, di masa depan akan berguna). Dalam parameter kami meneruskan data untuk otorisasi pengguna dan nama bahasa skrip yang akan dieksekusi cluster.
import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers)
Kami sedang menunggu status sesi untuk diam. Jika batas waktu melebihi batas waktu yang ditentukan - kirim pesan kesalahan.
timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st)
Sekarang Anda dapat mengirim kode ke Livy.
statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id)
Dalam loop, kita menunggu akhir eksekusi kode, kita mendapatkan hasil pemrosesan:
r.get('data').get('text/plain')
Metode hapus akan menghapus sesi.
requests.delete(session_url, headers=headers)
Untuk pembongkaran harian, Anda dapat menggunakan beberapa opsi, mereka sudah menulis tentang cron di hub, tetapi tentang modul jadwal yang mudah digunakan - tidak. Cukup tambahkan ke kode, itu tidak akan memerlukan penjelasan. Dan, untuk kenyamanan, saya akan mengumpulkan semua perhitungan di satu tempat.
Kode import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain'))
Kesimpulan:
Mungkin solusi ini tidak mengklaim sebagai yang terbaik, tetapi transparan bagi tim analis. Pro yang saya lihat di dalamnya:
- kemampuan untuk menggunakan Jupyter yang familier untuk otomatisasi
- interaksi visual
- anggota tim memiliki hak untuk memilih bagaimana ia akan bekerja dengan file (spark-zoo), akibatnya, tidak perlu menulis ulang skrip yang ada
Tentu saja, ketika memulai sejumlah besar tugas, Anda harus memantau sumber daya yang dibebaskan, mengkonfigurasi komunikasi antar pembongkaran. Masalah-masalah ini diselesaikan secara individual dan disepakati dengan rekan kerja.
Akan sangat bagus jika setidaknya satu tim mencatat keputusan ini.
Referensi
Dokumentasi Livy