"Simplicidade é pré-requisito para confiabilidade" por Edsger DijkstraPrólogo
Os gráficos são uma estrutura de dados tão clara e fácil de entender, desde a época de Leonhard Euler que forçou a quebrar a mente da humanidade em tarefas heterogêneas, como como você pode atravessar todas as sete pontes de Königsberg sem passar por nenhuma delas duas vezes ou como intermediário de viagem. rota rentável.
Muita coisa mudou desde Euler: transistores, linguagens de programação e computação distribuída apareceram. É o último desta lista que simplificou drasticamente o armazenamento e o processamento de gráficos. Na verdade, é isso que será discutido neste artigo.
Se você não estiver familiarizado com os conceitos básicos do Apache Spark, como RDD, programa Driver, nó Worker etc., antes de continuar com este artigo, recomendo que você leia a
documentação do Databricks.
Quanto a mim, a melhor maneira de lidar com uma tecnologia é tentar escrever algo nela. Neste artigo, faremos uma análise da semelhança de uma “rede social” usando os conceitos básicos da teoria dos grafos.
Escrevendo um código
O método de armazenamento de nossa “rede social” que escolhi foi extremamente simples e intuitivo: arquivos tsv em disco, naturalmente esses arquivos podem ser de qualquer outro formato, como Parquet, Avro. O local de armazenamento dos arquivos não importa se pode ser HDFS ou S3, mesmo se precisarmos alterar alguma coisa, o Spark SQL fará o trabalho principal por nós. A estrutura da rede será a seguinte: o primeiro arquivo é um par de ID do usuário e seu nome, o segundo arquivo de ID do usuário e uma lista de pares. O Apache Spark suporta as seguintes linguagens de programação Java, Scala e Python como APIs. Eu escolhi o segundo
Imediatamente, quero responder à pergunta popular sobre se vale a pena usar o Spark GraphX para armazenar gráficos quando você tem muitas operações de inserção / atualização - a resposta é não, todas as operações de alteração de RDD forçam a alteração de todo o RDD no cluster, o que não é a solução ideal, mas as especiais são adequadas para este caso Solução NoSql como Neo4J, Titanium ou mesmo Cassandra, Hbase. Nada impede que você use o Spark GraphX com eles especificamente para processar gráficos, carregando os próprios dados do banco de dados, por exemplo, por sheduler ou em um estilo orientado a eventos.
Bem, então, vamos começar a escrever código. Primeiro, precisamos carregar o gráfico na memória, pegar os arquivos de origem e extrair os vértices e arestas necessários (aqui estão os pontos principais, um link para a lista completa com o código-fonte pode ser encontrado no final do artigo):
def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES) .flatMap(InputDataFlow.parseNames) def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH) .flatMap(InputDataFlow.makeEdges)
Pregel
O principal mecanismo para iteração de gráfico no GraphX é o algoritmo Pregel. O algoritmo foi desenvolvido pelo Google, o modelo Pregel utiliza a
transferência de mensagens entre os vértices no gráfico. A passagem de mensagens através de uma série de iterações chamadas super-passos é a principal idéia por trás desse algoritmo. Além disso, a idéia principal pode ser descrita da seguinte forma:
“pense como um vértice” , ou seja, o estado do vértice atual depende apenas do estado de seus vizinhos.
O Pregel se torna extremamente necessário ao solucionar um problema com um MapReduce regular, se torna um processo extremamente difícil. Curiosamente, o nome Pregel vem do nome do rio, que atravessava as sete pontes de Koenigsberg.
A primitiva principal para percorrer um gráfico é um trigêmeo - consiste nos seguintes componentes: o vértice atual (um vértice de origem), o vértice para o qual passamos (um vértice de destino) e a aresta entre eles (uma aresta de conexão) - tudo é claro: onde ir para onde vamos e para onde vamos. Além disso, para Pregel, você deve especificar a distância padrão entre os vértices, como regra, é uma função PositiveInfinity, UDF (função definida pelo usuário) para cada vértice para processar a mensagem recebida e calcular o próximo vértice, e UDF para mesclar as duas mensagens recebidas, essa função deve ser comutativa e associativo. Como Scala é uma linguagem funcional, as duas últimas funções serão representadas como, duas expressões lambda.
Quando separamos os principais componentes do Pregel, vale a pena praticar. O primeiro algoritmo que iremos implementar é o algoritmo de Dijkstra para encontrar o caminho mais curto de um vértice arbitrário para todos os outros.
def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (_, dist, newDist) => math.min(dist, newDist), triplet => {
Tudo é óbvio aqui: partimos de um determinado vértice, usamos a função mínima para determinar a distância mínima em cada etapa. A primeira função usada por Pregel preserva a menor distância entre a mensagem recebida e o vértice atual. A segunda função distribui mensagens para os vizinhos, mantendo distância. A última função - este é um análogo do estágio Reduzir - seleciona o valor mínimo no caso de várias mensagens recebidas. Em seguida, simplesmente formamos uma saída gráfica conveniente.
Grau de separação
Estou certo de que muitos leitores deste artigo ouviram falar da teoria dos seis apertos de mão (
seis graus de separação ) - essa é uma teoria não comprovada segundo a qual duas pessoas são separadas por não mais que cinco níveis de conhecidos comuns, ou seja, são necessários no máximo 6 apertos de mão para conectar dois contatos arbitrários. homem na terra. Em termos da teoria dos grafos, soa assim: o diâmetro do gráfico de namoro não excede 6 para duas pessoas na Terra.
Vamos começar a escrever o código da seguinte forma: precisamos de uma pesquisa abrangente no gráfico para procurar contatos do vértice indicado; para isso, precisamos modificar o código do algoritmo Dijkstra:
def getBFS(root: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)( (_, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr + 1)) } else { Iterator.empty } }, (a, b) => math.min(a, b)).cache() bfs }
Tudo é muito parecido com o que foi acima, mas já indicamos o número de iterações - para seu gráfico, este pode ser um número diferente - 10 para o meu gráfico que obtive empiricamente. Em seguida, junte-se aos nomes de usuário e obtenha os 100 primeiros valores para um usuário arbitrário:
def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = { getBFS(root).vertices.join(verts).take(100) }
Agora, estamos procurando o grau de separação do vértice fornecido para todos os outros. Você também pode procurar o grau de separação de dois vértices arbitrários:
def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = { getBFS(firstUser) .vertices .filter { case (vertexId, _) => vertexId == secondUser } .collect.map { case (_, degree) => degree } }
O Spark GraphX da caixa oferece a oportunidade de obter muitas informações sobre o gráfico, por exemplo, para obter o componente conectado ao gráfico (componente conectado):
def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = { graph.degrees.join(verts) .sortBy({ case (_, (userName, _)) => userName }, ascending = false) .take(amount) }
Ou obtenha uma métrica como o número de triângulos no gráfico (contagem de triângulos):
def socialGraphTriangleCount = graph.triangleCount()
Page rank
O algoritmo PageRank surgiu graças aos estudantes de Stanford, Larry Page e Sergey Brin. Para cada vértice do gráfico, o algoritmo atribui importância entre todos os outros. Por exemplo, se um usuário do Twitter tiver um grande número de assinaturas de outros usuários, ele terá uma classificação alta; portanto, ele poderá ser facilmente encontrado no mecanismo de pesquisa.
O GraphX possui uma versão estática e dinâmica da implementação do PageRank. A versão estática possui um número fixo de iterações, enquanto a versão dinâmica funcionará até que a classificação comece a convergir para o valor especificado.
Para o nosso gráfico, será o seguinte:
def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.pageRank(tol = tolerance).vertices def staticRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.staticPageRank(numIter = 20).vertices
Conclusão
Um leitor atento observou que o tópico deste artigo é processamento distribuído de gráficos, mas ao escrever código, não fizemos nada para tornar o processamento realmente distribuído. E aqui devemos lembrar a citação de Edsger Dijkstra no início. O Spark simplifica drasticamente nossas vidas, assumindo o fardo e a carga da computação distribuída. Escrever código que será executado em um cluster distribuído não é uma tarefa tão difícil quanto poderia parecer no começo. E aqui existem até
várias opções para gerenciar recursos de cluster: Hadoop YARN, Apache Mesos (pessoalmente, minha opção favorita) e, mais recentemente, há suporte para o Kubernetes. Todo o código fonte analisado neste artigo pode ser encontrado no
github .