Hai Kami melatih orang untuk bekerja dengan data besar. Tidak mungkin membayangkan program pendidikan tentang data besar tanpa cluster sendiri, di mana semua peserta bekerja bersama. Untuk alasan ini, kami selalu memilikinya di program kami :) Kami terlibat dalam penyetelan, penyetelan dan administrasi, dan orang-orang langsung meluncurkan pekerjaan MapReduce di sana dan menggunakan Spark.
Dalam posting ini, kami akan menjelaskan bagaimana kami memecahkan masalah pemuatan cluster yang tidak merata dengan menulis autoscaler kami menggunakan cloud Mail.ru Cloud Solutions .
Masalah
Cluster yang kami gunakan tidak terlalu khas. Pembuangan sangat tidak merata. Misalnya, ada latihan praktis ketika semua 30 orang dan guru memasuki gugus dan mulai menggunakannya. Atau, sekali lagi, ada hari sebelum batas waktu, ketika beban meningkat secara dramatis. Sisa waktu, cluster beroperasi dalam mode underload.
Solusi # 1 adalah untuk menjaga cluster yang akan menahan beban puncak, tetapi akan tetap menganggur selama sisa waktu.
Solusi No. 2 adalah untuk menjaga cluster kecil di mana secara manual menambahkan node sebelum kelas dan selama beban puncak.
Solusi No. 3 adalah untuk menjaga cluster kecil dan menulis autoscaler yang akan memantau beban cluster saat ini dan menambah dan menghapus node dari cluster menggunakan berbagai API.
Dalam posting ini kita akan berbicara tentang keputusan No. 3. Autoscaler seperti itu sangat tergantung pada faktor-faktor eksternal, dan bukan pada faktor-faktor internal, dan penyedia sering tidak menyediakannya. Kami menggunakan infrastruktur cloud Mail.ru Cloud Solutions dan telah menulis autoscaler menggunakan MCS API. Dan karena kami sedang berlatih bekerja dengan data, kami memutuskan untuk menunjukkan bagaimana Anda dapat menulis autoscaler serupa untuk keperluan Anda dan digunakan dengan cloud Anda
Prasyarat
Pertama, Anda harus memiliki cluster Hadoop. Sebagai contoh, kami menggunakan distribusi HDP.
Agar node Anda dapat dengan cepat ditambahkan dan dihapus, Anda harus memiliki distribusi peran tertentu di antara node.
- Master node. Ya, tidak ada yang perlu dijelaskan di sini: simpul utama gugus, di mana, misalnya, driver Spark diluncurkan, jika Anda menggunakan mode interaktif.
- Node tanggal. Ini adalah simpul tempat Anda menyimpan data pada HDFS dan perhitungan dilakukan di sana.
- Komputasi node. Ini adalah simpul di mana Anda tidak menyimpan apa pun di HDFS, tetapi perhitungan dilakukan di sana.
Poin penting. Autoscaling akan terjadi karena jenis node ketiga. Jika Anda mulai mengambil dan menambahkan node dari tipe kedua, maka kecepatan respons akan sangat rendah - terkompresi dan direkomendasikan akan memakan waktu berjam-jam di cluster Anda. Ini, tentu saja, bukan yang Anda harapkan dari autoscaling. Artinya, kita tidak menyentuh node dari tipe pertama dan kedua. Mereka akan menjadi cluster yang layak minimal yang akan ada di seluruh program.
Jadi, autoscoiler kami ditulis dalam Python 3, menggunakan Ambari API untuk mengelola layanan kluster, menggunakan API Mail.ru Cloud Solutions (MCS) untuk memulai dan menghentikan mesin.
Arsitektur Solusi
- Modul
autoscaler.py
. Tiga kelas terdaftar di dalamnya: 1) fungsi untuk bekerja dengan Ambari, 2) fungsi untuk bekerja dengan MCS, 3) fungsi yang terkait langsung dengan logika autoscaler. - Observer skrip
observer.py
. Bahkan, itu terdiri dari aturan yang berbeda: kapan dan pada saat apa untuk memanggil fungsi autoscaler. - File dengan parameter konfigurasi
config.py
. Ini berisi, misalnya, daftar node yang diizinkan untuk penskalaan otomatis dan parameter lain yang memengaruhi, misalnya, berapa banyak waktu untuk menunggu sejak node baru ditambahkan. Ada juga stempel waktu dimulainya kelas, sehingga sebelum sesi konfigurasi cluster maksimum yang diizinkan diluncurkan.
Mari kita lihat potongan-potongan kode di dalam dua file pertama.
1. Modul autoscaler.py
Kelas ambari
Ini adalah bagian dari kode yang berisi kelas Ambari
:
class Ambari: def __init__(self, ambari_url, cluster_name, headers, auth): self.ambari_url = ambari_url self.cluster_name = cluster_name self.headers = headers self.auth = auth def stop_all_services(self, hostname): url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/' url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname req0 = requests.get(url2, headers=self.headers, auth=self.auth) services = req0.json()['host_components'] services_list = list(map(lambda x: x['HostRoles']['component_name'], services)) data = { "RequestInfo": { "context":"Stop All Host Components", "operation_level": { "level":"HOST", "cluster_name": self.cluster_name, "host_names": hostname }, "query":"HostRoles/component_name.in({0})".format(",".join(services_list)) }, "Body": { "HostRoles": { "state":"INSTALLED" } } } req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth) if req.status_code in [200, 201, 202]: message = 'Request accepted' else: message = req.status_code return message
Untuk contoh di atas, Anda dapat melihat implementasi fungsi stop_all_services
, yang menghentikan semua layanan pada node cluster yang diinginkan.
Untuk kelas Ambari
Anda lulus:
ambari_url
, misalnya, dari bentuk 'http://localhost:8080/api/v1/clusters/'
,cluster_name
adalah nama cluster Anda di Ambari,headers = {'X-Requested-By': 'ambari'}
- dan di dalam
auth
terdapat nama pengguna dan kata sandi Anda dari Ambari: auth = ('login', 'password')
.
Fungsi itu sendiri tidak lebih dari beberapa panggilan melalui REST API ke Ambari. Dari sudut pandang logika, pertama-tama kita mendapatkan daftar layanan yang berjalan pada node, dan kemudian kita bertanya pada cluster ini, pada node ini, untuk mentransfer layanan dari daftar ke keadaan INSTALLED
. Fungsi untuk meluncurkan semua layanan, untuk menempatkan node dalam status Maintenance
, dll. Terlihat serupa - mereka hanya beberapa permintaan melalui API.
MC kelas
Ini adalah bagian dari kode yang berisi kelas Mcs
:
class Mcs: def __init__(self, id1, id2, password): self.id1 = id1 self.id2 = id2 self.password = password self.mcs_host = 'https://infra.mail.ru:8774/v2.1' def vm_turn_on(self, hostname): self.token = self.get_mcs_token() host = self.hostname_to_vmname(hostname) vm_id = self.get_vm_id(host) mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action' headers = { 'X-Auth-Token': '{0}'.format(self.token), 'Content-Type': 'application/json' } data = {'os-start' : 'null'} mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers) return mcs.status_code
Untuk kelas Mcs
, kami melewati id proyek di dalam cloud dan ID pengguna, serta kata sandi-nya. Dalam fungsi vm_turn_on
kami ingin mengaktifkan salah satu mesin. Logikanya di sini sedikit lebih rumit. Pada awal kode, tiga fungsi lain disebut: 1) kita perlu mendapatkan token, 2) kita perlu mengubah nama host menjadi nama mesin di MCS, 3) mendapatkan id dari mesin ini. Selanjutnya, kami membuat post-request sederhana dan menjalankan mesin ini.
Ini adalah bagaimana fungsi penerima token terlihat seperti:
def get_mcs_token(self): url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog' headers = {'Content-Type': 'application/json'} data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'id': self.id1, 'password': self.password } } }, 'scope': { 'project': { 'id': self.id2 } } } } params = (('nocatalog', ''),) req = requests.post(url, data=json.dumps(data), headers=headers, params=params) self.token = req.headers['X-Subject-Token'] return self.token
Autoscaler kelas
Kelas ini berisi fungsi yang terkait dengan logika kerja itu sendiri.
Beginilah tampilan kode dari kelas ini:
class Autoscaler: def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node): self.scaling_hosts = scaling_hosts self.ambari = ambari self.mcs = mcs self.q_ram = deque() self.q_cpu = deque() self.num = 0 self.yarn_ram_per_node = yarn_ram_per_node self.yarn_cpu_per_node = yarn_cpu_per_node def scale_down(self, hostname): flag1 = flag2 = flag3 = flag4 = flag5 = False if hostname in self.scaling_hosts: while True: time.sleep(5) status1 = self.ambari.decommission_nodemanager(hostname) if status1 == 'Request accepted' or status1 == 500: flag1 = True logging.info('Decomission request accepted: {0}'.format(flag1)) break while True: time.sleep(5) status3 = self.ambari.check_service(hostname, 'NODEMANAGER') if status3 == 'INSTALLED': flag3 = True logging.info('Nodemaneger decommissioned: {0}'.format(flag3)) break while True: time.sleep(5) status2 = self.ambari.maintenance_on(hostname) if status2 == 'Request accepted' or status2 == 500: flag2 = True logging.info('Maintenance request accepted: {0}'.format(flag2)) break while True: time.sleep(5) status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER') if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST': flag4 = True self.ambari.stop_all_services(hostname) logging.info('Maintenance is on: {0}'.format(flag4)) logging.info('Stopping services') break time.sleep(90) status5 = self.mcs.vm_turn_off(hostname) while True: time.sleep(5) status5 = self.mcs.get_vm_info(hostname)['server']['status'] if status5 == 'SHUTOFF': flag5 = True logging.info('VM is turned off: {0}'.format(flag5)) break if flag1 and flag2 and flag3 and flag4 and flag5: message = 'Success' logging.info('Scale-down finished') logging.info('Cooldown period has started. Wait for several minutes') return message
Untuk input kita mengambil kelas Ambari
dan Mcs
, daftar node yang diizinkan untuk scaling, serta parameter konfigurasi dari node: memori dan cpu yang dialokasikan ke node di BENANG. Ada juga 2 parameter internal q_ram, q_cpu, yang merupakan antrian. Dengan menggunakannya, kami menyimpan nilai-nilai beban cluster saat ini. Jika kita melihat bahwa selama 5 menit terakhir telah terjadi peningkatan beban yang stabil, maka kami memutuskan bahwa kami perlu menambahkan simpul +1 ke kluster. Hal yang sama juga berlaku untuk keadaan underload cluster.
Kode di atas menunjukkan contoh fungsi yang menghapus mesin dari sebuah cluster dan menghentikannya di cloud. Pertama, YARN Nodemanager
, lalu mode Maintenance
dihidupkan, maka kami menghentikan semua layanan pada mesin dan mematikan mesin virtual di cloud.
2. Script observer.py
Kode contoh dari sana:
if scaler.assert_up(config.scale_up_thresholds) == True: hostname = cloud.get_vm_to_up(config.scaling_hosts) if hostname != None: status1 = scaler.scale_up(hostname) if status1 == 'Success': text = {"text": "{0} has been successfully scaled-up".format(hostname)} post = {"text": "{0}".format(text)} json_data = json.dumps(post) req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) time.sleep(config.cooldown_period*60)
Di dalamnya, kami memeriksa apakah ada kondisi untuk meningkatkan kapasitas cluster dan apakah ada mesin di cadangan sama sekali, kami mendapatkan nama host salah satunya, menambahkannya ke cluster dan mempublikasikan pesan tentang ini di Slack tim kami. Setelah itu, cooldown_period
diluncurkan, ketika kita tidak menambah atau menghapus apa pun dari cluster, tetapi cukup memantau beban. Jika telah stabil dan berada di dalam koridor nilai beban optimal, maka kami hanya melanjutkan pemantauan. Jika satu node tidak cukup, tambahkan satu lagi.
Untuk kasus ketika kita memiliki pelajaran di depan, kita sudah tahu pasti bahwa satu node tidak cukup, jadi kami segera memulai semua node gratis dan tetap aktif sampai akhir pelajaran. Ini terjadi dengan daftar kelas cap waktu.
Kesimpulan
Autoscaler adalah solusi yang baik dan nyaman untuk kasus-kasus ketika Anda memuat cluster tidak merata. Anda secara bersamaan mencapai konfigurasi gugus yang diinginkan untuk beban puncak, dan pada saat yang sama tidak menahan gugus ini selama kurang muatan, menghemat uang. Nah, ditambah lagi, semuanya terjadi secara otomatis tanpa partisipasi Anda. Autoscoiler itu sendiri tidak lebih dari satu set permintaan ke API manajer kluster dan API penyedia cloud, terdaftar menurut logika tertentu. Apa yang sebenarnya perlu diingat adalah pemisahan node menjadi 3 jenis, seperti yang kita tulis sebelumnya. Dan kamu akan bahagia.