"Kesederhanaan adalah prasyarat untuk keandalan" oleh Edsger DijkstraProlog
Grafik adalah struktur data yang begitu jelas dan mudah dipahami, karena pada zaman Leonhard Euler, ia terpaksa menghancurkan pikiran umat manusia atas tugas-tugas heterogen, seperti bagaimana Anda dapat melewati ketujuh jembatan Königsberg tanpa melalui salah satu dari dua kali atau sebagai perantara perjalanan, temukan yang paling rute yang menguntungkan.
Banyak yang telah berubah sejak Euler: transistor, bahasa pemrograman, dan komputasi terdistribusi telah muncul. Ini adalah yang terakhir dari daftar ini yang secara dramatis menyederhanakan penyimpanan dan pemrosesan grafik. Sebenarnya, inilah yang akan dibahas dalam artikel ini.
Jika Anda tidak terbiasa dengan konsep-konsep dasar Apache Spark seperti RDD, program Driver, simpul Pekerja dll, maka sebelum melanjutkan dengan artikel ini, saya akan merekomendasikan Anda membaca
dokumentasi dari Databricks.
Bagi saya, cara terbaik untuk berurusan dengan teknologi adalah mencoba menulis sesuatu di atasnya. Pada artikel ini kita akan melakukan analisis kesamaan “jejaring sosial” menggunakan konsep dasar teori graf.
Menulis kode
Metode penyimpanan "jaringan sosial" kami yang saya pilih sangat sederhana dan intuitif: file tsv pada disk, tentu saja ini bisa berupa file dengan format lain seperti Parket, Avro. Lokasi penyimpanan file tidak masalah apakah itu HDFS atau S3, bahkan jika kita perlu mengubah sesuatu, maka Spark SQL akan melakukan pekerjaan utama bagi kita. Struktur jaringan adalah sebagai berikut: file pertama adalah sepasang Id pengguna dan namanya, file ID pengguna kedua dan daftar rekan. Apache Spark mendukung bahasa pemrograman Java, Scala, dan Python berikut sebagai API. Saya memilih yang kedua.
Segera saya ingin menjawab pertanyaan populer apakah layak menggunakan Spark GraphX untuk menyimpan grafik ketika Anda memiliki banyak operasi insert / update - jawabannya tidak, semua operasi perubahan RDD memaksa mengubah seluruh RDD dalam cluster, yang bukan solusi optimal, yang khusus cocok untuk kasus ini Solusi NoSql seperti Neo4J, Titanium atau bahkan Cassandra, Hbase. Tidak ada yang mencegah Anda menggunakan Spark GraphX dengan mereka secara khusus untuk memproses grafik, memuat data itu sendiri dari database, misalnya, dengan sheduler atau dalam gaya event driven.
Kalau begitu, mari kita mulai menulis kode. Pertama, kita perlu memuat grafik ke dalam memori, mengambil file sumber dan mengeluarkan simpul dan tepi yang diperlukan (di sini adalah poin utama, tautan ke daftar lengkap dengan kode sumber dapat ditemukan di akhir artikel):
def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES) .flatMap(InputDataFlow.parseNames) def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH) .flatMap(InputDataFlow.makeEdges)
Pregel
Mekanisme utama untuk iterasi grafik di GraphX adalah algoritma Pregel. Algoritma dikembangkan oleh Google, model Pregel menggunakan
transfer pesan antara simpul dalam grafik. Pesan yang melewati serangkaian iterasi yang disebut supersteps adalah ide utama di balik algoritma ini. Juga, ide utama dapat dijelaskan sebagai berikut:
"Berpikir seperti titik" , yaitu, keadaan saat ini hanya bergantung pada tingkat negara tetangganya.
Pregel menjadi sangat diperlukan ketika menyelesaikan masalah dengan MapReduce biasa menjadi proses yang sangat sulit. Menariknya, nama Pregel berasal dari nama sungai, yang membentang tujuh jembatan Koenigsberg.
Primitif utama untuk melintasi grafik adalah triplet - terdiri dari komponen berikut: simpul saat ini (simpul sumber), simpul yang kami lewati (titik tujuan) dan tepi di antara mereka (ujung yang menghubungkan) - semuanya jelas: di mana pergi ke mana kita pergi dan ke mana kita pergi. Juga, untuk Pregel, Anda harus menentukan jarak default antara simpul, sebagai aturan, itu adalah fungsi PositiveInfinity, UDF (fungsi yang ditentukan pengguna) untuk setiap simpul untuk memproses pesan yang masuk dan menghitung simpul berikutnya, dan UDF untuk menggabungkan dua pesan yang masuk, fungsi ini harus komutatif dan asosiatif. Karena Scala adalah bahasa fungsional, dua fungsi terakhir akan direpresentasikan sebagai, dua ekspresi lambda.
Saat kami memisahkan komponen utama Pregel, ada baiknya untuk berlatih. Algoritme pertama yang akan kami terapkan adalah algoritma Dijkstra untuk menemukan jalur terpendek dari titik arbitrer ke semua jalur lainnya.
def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (_, dist, newDist) => math.min(dist, newDist), triplet => {
Semuanya jelas di sini: kita mulai dari titik tertentu, gunakan fungsi minimum untuk menentukan jarak minimum pada setiap langkah. Fungsi pertama yang digunakan oleh Pregel mempertahankan jarak terpendek antara pesan yang masuk dan titik saat ini. Fungsi kedua mendistribusikan pesan ke tetangga sambil menjaga jarak. Fungsi terakhir - ini adalah analog dari tahap Reduce - memilih nilai minimum untuk beberapa pesan yang masuk. Selanjutnya, kita cukup membentuk output grafik yang nyaman.
Tingkat pemisahan
Saya yakin bahwa banyak pembaca artikel ini telah mendengar tentang teori enam jabat tangan (
Enam derajat pemisahan ) - ini adalah teori yang belum terbukti yang menurutnya setiap dua orang dipisahkan oleh tidak lebih dari lima tingkat kenalan umum, yaitu, diperlukan maksimum 6 jabat tangan untuk menghubungkan dua secara sewenang-wenang manusia di bumi. Dalam hal teori grafik, kedengarannya seperti ini: diameter grafik penanggalan tidak melebihi 6 untuk setiap dua orang di Bumi.
Mari kita mulai menulis kode dengan yang berikut ini, kita perlu pencarian luas pertama pada grafik untuk mencari kontak dari vertex yang ditunjukkan, untuk ini kita perlu memodifikasi kode algoritma Dijkstra:
def getBFS(root: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)( (_, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr + 1)) } else { Iterator.empty } }, (a, b) => math.min(a, b)).cache() bfs }
Semuanya sangat mirip dengan apa yang ada di atas, tetapi kami sudah menunjukkan jumlah iterasi - untuk grafik Anda ini mungkin angka yang berbeda - 10 untuk grafik saya, saya mendapat secara empiris. Selanjutnya, bergabunglah dengan nama pengguna dan ambil 100 nilai pertama untuk pengguna yang sewenang-wenang:
def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = { getBFS(root).vertices.join(verts).take(100) }
Sekarang kami sedang mencari tingkat pemisahan dari titik yang diberikan kepada yang lainnya, Anda juga dapat mencari tingkat pemisahan untuk dua titik yang berubah-ubah:
def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = { getBFS(firstUser) .vertices .filter { case (vertexId, _) => vertexId == secondUser } .collect.map { case (_, degree) => degree } }
Spark GraphX dari kotak memberi Anda peluang untuk mendapatkan banyak informasi tentang grafik, misalnya, untuk mendapatkan komponen terhubung dari grafik (komponen terhubung):
def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = { graph.degrees.join(verts) .sortBy({ case (_, (userName, _)) => userName }, ascending = false) .take(amount) }
Atau dapatkan metrik seperti jumlah segitiga dalam grafik (jumlah segitiga):
def socialGraphTriangleCount = graph.triangleCount()
Peringkat halaman
Algoritma PageRank muncul berkat mahasiswa pascasarjana Stanford, Larry Page dan Sergey Brin. Untuk setiap simpul grafik, algoritma memberikan arti penting di antara yang lainnya. Misalnya, jika pengguna Twitter memiliki banyak langganan dari pengguna lain, maka ia akan memiliki peringkat tinggi, oleh karena itu, ia dapat dengan mudah ditemukan di mesin pencari.
GraphX memiliki versi statis dan dinamis dari implementasi PageRank. Versi statis memiliki jumlah iterasi yang tetap, sementara versi dinamis akan bekerja sampai peringkat mulai konvergen ke nilai yang diberikan.
Untuk grafik kami, ini adalah sebagai berikut:
def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.pageRank(tol = tolerance).vertices def staticRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.staticPageRank(numIter = 20).vertices
Kesimpulan
Pembaca yang penuh perhatian mencatat bahwa topik artikel ini didistribusikan pemrosesan grafik, tetapi ketika menulis kode, kami tidak melakukan apa pun untuk membuat pemrosesan benar-benar didistribusikan. Dan di sini kita harus mengingat kutipan oleh Edsger Dijkstra di awal. Spark secara dramatis menyederhanakan hidup kita dengan menanggung beban dan beban komputasi terdistribusi. Menulis kode yang akan dieksekusi pada cluster terdistribusi bukanlah tugas yang sulit seperti yang mungkin tampak pada awalnya. Dan di sini bahkan ada
beberapa opsi untuk mengelola sumber daya cluster: Hadoop YARN, Apache Mesos (secara pribadi, opsi favorit saya), dan baru-baru ini, ada dukungan untuk Kubernetes. Semua kode sumber yang diuraikan dalam artikel ini dapat ditemukan di
github .