Classifique grandes quantidades de dados no Apache Spark usando modelos arbitrários de aprendizado de máquina

Parte 1: Declaração do Problema


Olá Habr! Sou arquiteto de soluções na CleverDATA. Hoje vou falar sobre como classificamos grandes quantidades de dados usando modelos criados usando quase qualquer biblioteca de aprendizado de máquina disponível. Nesta série de duas partes, consideraremos as seguintes perguntas.

  • Como apresentar um modelo de aprendizado de máquina como serviço (Modelo como Serviço)?
  • Como as tarefas de processamento distribuído de grandes quantidades de dados são realizadas fisicamente usando o Apache Spark?
  • Quais problemas surgem quando o Apache Spark interage com serviços externos?
  • Como a interação do Apache Spark com serviços externos pode ser organizada usando as bibliotecas akka-streams e akka-http, bem como a abordagem do Reactive Streams?

Inicialmente, planejei escrever um artigo, mas como o volume de material era bastante grande, decidi dividi-lo em duas partes. Hoje, na primeira parte, consideraremos a declaração geral do problema, bem como os principais problemas que precisam ser resolvidos durante a implementação. Na segunda parte, falaremos sobre a implementação prática da solução para esse problema usando a abordagem de Fluxos Reativos.


Nossa empresa CleverDATA possui uma equipe de analistas de dados que, com a ajuda de uma ampla gama de ferramentas (como scikit-learn, facebook fastText, xgboost, tensorFlow, etc.), treinam modelos de aprendizado de máquina. A linguagem de programação básica de fato usada pelos analistas é o Python. Quase todas as bibliotecas para aprendizado de máquina, mesmo implementadas originalmente em outras linguagens, têm uma interface Python e são integradas às principais bibliotecas Python (principalmente com o NumPy).

Por outro lado, o ecossistema Hadoop é amplamente usado para armazenar e processar grandes quantidades de dados não estruturados. Nele, os dados são armazenados no sistema de arquivos HDFS na forma de blocos replicados distribuídos de um determinado tamanho (geralmente 128 MB, mas é possível configurar). Os algoritmos de processamento de dados distribuídos mais eficientes tentam minimizar a interação da rede entre as máquinas de cluster. Para fazer isso, os dados devem ser processados ​​nas mesmas máquinas em que estão armazenados.

Obviamente, em muitos casos, a interação da rede não pode ser completamente evitada, mas, no entanto, você precisa tentar executar todas as tarefas localmente e minimizar a quantidade de dados que precisarão ser transmitidos pela rede.

Esse princípio de processamento de dados distribuídos é chamado de "mover cálculos perto dos dados". Todas as principais estruturas, principalmente o Hadoop MapReduce e o Apache Spark, aderem a esse princípio. Eles determinam a composição e a sequência de operações específicas que precisam ser executadas em máquinas nas quais os blocos de dados necessários estão armazenados.

Figura 1. O cluster HDFS consiste em várias máquinas, uma das quais é um nó de nome e o restante é um nó de dados. O Nó de nome armazena informações sobre os arquivos que compõem seus blocos e sobre as máquinas nas quais eles estão localizados fisicamente. Os próprios blocos são armazenados no Nó de Dados, que são replicados para várias máquinas para aumentar a confiabilidade. O Nó de Dados também executa tarefas de processamento de dados. As tarefas consistem no processo principal (Mestre, M), que coordena o lançamento dos processos de trabalho (Trabalhador, W) nas máquinas em que os blocos de dados necessários estão armazenados.

Quase todos os componentes do ecossistema Hadoop são lançados usando a Java Virtual Machine (JVM) e estão intimamente integrados entre si. Por exemplo, para executar tarefas gravadas usando o Apache Spark para trabalhar com dados armazenados no HDFS, quase nenhuma manipulação adicional é necessária: a estrutura fornece essa funcionalidade imediatamente.

Infelizmente, a maior parte das bibliotecas projetadas para aprendizado de máquina pressupõe que os dados são armazenados e processados ​​localmente. Ao mesmo tempo, existem bibliotecas totalmente integradas ao ecossistema Hadoop, por exemplo, Spark ML ou Apache Mahout. No entanto, eles têm uma série de desvantagens significativas. Primeiro, eles fornecem muito menos implementações de algoritmos de aprendizado de máquina. Em segundo lugar, nem todos os analistas de dados podem trabalhar com eles. As vantagens dessas bibliotecas incluem o fato de poderem ser usadas para treinar modelos em grandes volumes de dados usando computação distribuída.

No entanto, os analistas de dados costumam usar métodos alternativos para treinar modelos, em particular bibliotecas que permitem o uso de GPUs. Não vou considerar os problemas dos modelos de treinamento neste artigo, porque quero focar no uso de modelos prontos criados usando qualquer biblioteca de aprendizado de máquina disponível para classificar grandes quantidades de dados.

Portanto, a principal tarefa que estamos tentando resolver aqui é aplicar modelos de aprendizado de máquina a grandes quantidades de dados armazenados no HDFS. Se pudéssemos usar o módulo SparkML da biblioteca Apache Spark, que implementa os algoritmos básicos de aprendizado de máquina, classificar grandes quantidades de dados seria uma tarefa trivial:

val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset) 

Infelizmente, essa abordagem funciona apenas para algoritmos implementados no módulo SparkML (uma lista completa pode ser encontrada aqui ). Além disso, no caso de usar outras bibliotecas implementadas não na JVM, tudo se torna muito mais complicado.

Para resolver esse problema, decidimos agrupar o modelo em um serviço REST. Portanto, ao iniciar a tarefa de classificar os dados armazenados no HDFS, é necessário organizar a interação entre as máquinas nas quais os dados estão armazenados e a máquina (ou cluster de máquinas) em que o serviço de classificação está sendo executado.

Figura 2. O conceito de modelo como serviço

Descrição do serviço de classificação Python


Para apresentar o modelo como um serviço, é necessário resolver as seguintes tarefas:

  1. implementar acesso eficiente ao modelo via HTTP;
  2. garantir o uso mais eficiente dos recursos da máquina (principalmente todos os núcleos e memória do processador);
  3. fornecer resistência a altas cargas;
  4. fornecer a capacidade de dimensionar horizontalmente.

O acesso ao modelo via HTTP é bastante simples de implementar: um grande número de bibliotecas foi desenvolvido para Python que permite implementar um ponto de acesso REST usando uma pequena quantidade de código. Um desses microframes é o Flask . A implementação do serviço de classificação no Flask é a seguinte:

 from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run() 

Aqui, quando o serviço é iniciado, carregamos o modelo na memória e o usamos quando chamamos o método de classificação. A função load_model carrega o modelo de alguma fonte externa, seja no sistema de arquivos, no armazenamento de valores-chave, etc.

Um modelo é um objeto que possui um método de previsão. No caso de classificação, ele recebe uma entrada para algum vetor de recurso de um determinado tamanho e produz um valor booleano indicando se o vetor especificado é adequado para este modelo ou algum valor de 0 a 1, ao qual você pode aplicar o limite de corte: tudo acima do limite, é um resultado positivo da classificação, o resto não é.

O vetor de recurso que precisamos classificar é passado em forma binária e desserializado em uma matriz numpy. Seria sobrecarregado fazer uma solicitação HTTP para cada vetor. Por exemplo, no caso de um vetor 100-dimensional e usando valores do tipo float32, uma solicitação HTTP completa, incluindo cabeçalhos, seria algo como isto:

 PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data] 

Como você pode ver, a eficiência dessa solicitação é muito baixa (400 bytes de carga útil / (cabeçalho de 133 bytes + corpo de 400 bytes) = 75%). Felizmente, em quase todas as bibliotecas, o método de previsão permite que você receba não o vetor [1 xn], mas a matriz [mxn] e, consequentemente, produza o resultado imediatamente para m valores de entrada.

Além disso, a biblioteca numpy é otimizada para trabalhar com matrizes grandes, permitindo que você use efetivamente todos os recursos disponíveis da máquina. Assim, podemos enviar não um, mas um número bastante grande de vetores de recurso em uma solicitação, desserializá-los em uma matriz numpy de tamanho [mxn], classificar e retornar o vetor [mx 1] a partir dos valores booleanos ou float32. Como resultado, a eficiência da interação HTTP ao usar uma matriz de 1000 linhas se torna quase igual a 100%. O tamanho dos cabeçalhos HTTP nesse caso pode ser negligenciado.

Para testar o serviço Flask na máquina local, você pode executá-lo na linha de comando. No entanto, esse método é completamente inadequado para uso industrial. O fato é que o Flask é de thread único e, se observarmos o diagrama de carga do processador enquanto o serviço estiver em execução, veremos que um núcleo está 100% carregado e o restante está inativo. Felizmente, existem maneiras de usar todos os kernels da máquina: para isso, o Flask precisa ser executado no servidor de aplicativos da web uwsgi. Ele permite que você configure da melhor maneira o número de processos e threads, para garantir uma carga uniforme em todos os núcleos do processador. Mais detalhes sobre todas as opções para configurar o uwsgi podem ser encontrados aqui .

É melhor usar o nginx como um ponto de entrada HTTP, pois o uwsgi pode funcionar de maneira instável em caso de altas cargas. O Nginx, por outro lado, leva todo o fluxo de entrada de solicitações para si mesmo, filtra solicitações inválidas e dosa a carga no uwsgi. O Nginx se comunica com o uwsgi por soquetes do linux usando um arquivo de processo. Um exemplo de configuração do nginx é mostrado abaixo:

 server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } } 

Como podemos ver, acabou sendo uma configuração bastante complicada para uma máquina. Se precisarmos classificar grandes quantidades de dados, um grande número de solicitações chegará a esse serviço e isso poderá se tornar um gargalo. A solução para esse problema é a escala horizontal.

Por conveniência, empacotamos o serviço em um contêiner do Docker e o implantamos no número necessário de máquinas. Se desejar, você pode usar ferramentas de implantação automatizada, como o Kubernetes. Um exemplo de estrutura do Dockerfile para criar um contêiner com um serviço é fornecido abaixo.

 FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 RUN update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 RUN pip install uwsgi flask scipy scikit-learn #copying script files WORKDIR /etc/score COPY score.py . COPY score.ini . COPY start.sh . RUN chmod +x start.sh RUN rm /etc/nginx/sites-enabled/default COPY score.nginx /etc/nginx/sites-enabled/ EXPOSE 80 ENTRYPOINT ["./start.sh"] 

Portanto, a estrutura do serviço para classificação é a seguinte:

Figura 3. Esquema de serviço para classificação

Um breve resumo do trabalho do Apache Spark no ecossistema Hadoop


Agora considere o processo de processamento de dados armazenados no HDFS. Como observei anteriormente, o princípio de transferir cálculos para dados é usado para isso. Para iniciar o processamento de tarefas, você precisa saber em quais máquinas os blocos de dados que precisamos estão armazenados para executar processos diretamente envolvidos no processamento deles. Também é necessário coordenar o lançamento desses processos, reiniciá-los em caso de emergência, se necessário, agregar os resultados de várias subtarefas, etc.

Todas essas tarefas são realizadas por uma variedade de estruturas que trabalham com o ecossistema Hadoop. Um dos mais populares e convenientes é o Apache Spark. O principal conceito em torno do qual toda a estrutura é criada é RDD (Conjunto de dados distribuídos resilientes). Em geral, o RDD pode ser considerado uma coleção distribuída resistente a quedas. O RDD pode ser obtido de duas maneiras principais:

  1. criação de uma fonte externa, como uma coleção na memória, um arquivo ou diretório no sistema de arquivos, etc;
  2. conversão de outro RDD aplicando operações de transformação. O RDD suporta todas as operações básicas de trabalho com coleções, como map, flatMap, filter, groupBy, join, etc.

É importante entender que o RDD, diferentemente das coleções, não é diretamente dados, mas uma sequência de operações que devem ser executadas nos dados. Portanto, quando as operações de transformação são chamadas, nenhum trabalho realmente acontece e apenas obtemos um novo RDD, que conterá mais uma operação do que a anterior. O trabalho em si começa quando as chamadas operações ou ações do terminal são chamadas. Isso inclui salvar em um arquivo, salvar em uma coleção na memória, contar o número de elementos etc.

Ao iniciar uma operação do terminal, o Spark cria um gráfico de operação acíclica (DAG, Directed Acyclic Graph) com base no RDD resultante e os executa sequencialmente no cluster de acordo com o gráfico recebido. Ao criar um DAG baseado em RDD, o Spark realiza várias otimizações, por exemplo, se possível, combina várias transformações sucessivas em uma operação.

RDD foi a principal unidade de interação com a API Spark nas versões do Spark 1.x. No Spark 2.x, os desenvolvedores disseram que agora o principal conceito de interação é o Dataset. O conjunto de dados é um complemento para RDD com suporte para interação semelhante a SQL. Ao usar a API do conjunto de dados, o Spark permite que você use uma ampla gama de otimizações, incluindo as de nível bastante baixo. Mas, em geral, os princípios básicos que se aplicam aos RDDs também se aplicam ao Dataset.

Mais detalhes sobre o trabalho do Spark podem ser encontrados na documentação no site oficial .

Vamos considerar um exemplo da classificação mais simples no Spark sem usar serviços externos. Um algoritmo sem sentido é implementado aqui, que considera a proporção de cada uma das letras latinas no texto e, em seguida, considera o desvio padrão. Aqui, antes de tudo, é importante prestar atenção diretamente às etapas básicas usadas ao trabalhar com o Spark.

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ??? //(2) val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] //(3) val result: Dataset[Score] = ds.map {d: Data => //(4) val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) }.map {f: Features => Score(f.id, std(f.vector)) //(5) } result.write.parquet("/path/to/result") //(6) 

Neste exemplo, nós:

  1. determinamos a estrutura dos dados de entrada, intermediários e de saída (os dados de entrada são definidos como algum texto ao qual um determinado identificador está associado, os dados intermediários correspondem ao identificador com o vetor de característica e a saída corresponde ao identificador com algum valor numérico);
  2. definimos uma função para calcular o valor resultante por um vetor de característica (por exemplo, desvio padrão, implementação não mostrada);
  3. definir o conjunto de dados original como dados armazenados no HDFS no formato parquet ao longo do caminho / caminho / para / dados;
  4. Defina um conjunto de dados intermediário como um mapa de bitmap do conjunto de dados original.
  5. Da mesma forma, determinamos o conjunto de dados resultante por meio de uma transformação bit a bit do intermediário;
  6. salve o conjunto de dados resultante no HDFS no formato parquet ao longo do caminho / caminho / para / resultado. Como salvar em um arquivo é uma operação do terminal, os próprios cálculos são iniciados precisamente nesse estágio.

O Apache Spark trabalha com o princípio de mestre-trabalhador. Quando o aplicativo é iniciado, o processo principal, chamado de driver, é iniciado. Ele executa o código responsável pela formação do RDD, com base no qual os cálculos serão realizados.

Quando uma operação do terminal é chamada, o driver gera um DAG com base no RDD resultante. Em seguida, o driver inicia o lançamento de fluxos de trabalho chamados executores, nos quais os dados serão processados ​​diretamente. Após iniciar os fluxos de trabalho, o driver transmite a eles o bloco executável que precisa ser executado e também indica a qual parte dos dados precisa ser aplicada.

Abaixo está o código do nosso exemplo, no qual as seções de código executadas no executor (entre as linhas iniciador e final do executor) são destacadas. O restante do código é executado no driver.

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map { // --------------- EXECUTOR PART BEGIN ----------------------- d: Data => val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) // --------------- EXECUTOR PART END ----------------------- }.map { // --------------- EXECUTOR PART BEGIN ----------------------- f: Features => Score(f.id, std(f.vector)) // --------------- EXECUTOR PART END ----------------------- } result.write.parquet(“/path/to/result”) 

No ecossistema Hadoop, todos os aplicativos são executados em contêineres. Um contêiner é um processo em execução em uma das máquinas em um cluster ao qual é alocada uma certa quantidade de recursos. O lançamento dos contêineres é tratado pelo YARN Resource Manager. Ele determina qual das máquinas possui um número suficiente de núcleos de processador e RAM, bem como se contém os blocos de dados necessários para o processamento.

Ao iniciar o aplicativo Spark, o YARN cria e executa o contêiner em uma das máquinas de cluster nas quais inicia o driver. Então, quando o driver prepara o DAG a partir de operações que precisam ser executadas nos executores, o YARN lança contêineres adicionais nas máquinas desejadas.

Como regra, basta que o driver aloque um núcleo e uma pequena quantidade de memória (a menos que, é claro, o resultado do cálculo não seja agregado no driver na memória). Para os executores, para otimizar recursos e reduzir o número total de processos no sistema, mais de um núcleo pode ser distinguido: nesse caso, o executor poderá executar várias tarefas simultaneamente.

Mas aqui é importante entender que, no caso de uma falha de uma das tarefas em execução no contêiner ou no caso de recursos insuficientes, o YARN pode decidir parar o contêiner e, em seguida, todas as tarefas executadas nele terão que ser reiniciadas novamente em outro artista. Além disso, se alocarmos um número suficientemente grande de núcleos por contêiner, é provável que o YARN não consiga iniciá-lo. Por exemplo, se tivermos duas máquinas nas quais dois núcleos são deixados sem uso, podemos iniciar em cada contêiner que requer dois núcleos, mas não podemos iniciar um contêiner que requer quatro núcleos.

Agora vamos ver como o código do nosso exemplo será executado diretamente no cluster. Imagine que o tamanho dos dados de origem seja 2 Terabytes. Portanto, se o tamanho do bloco no HDFS for de 128 megabytes, haverá 16384 blocos no total. Cada bloco é replicado para várias máquinas para garantir a confiabilidade. Para simplificar, consideramos o fator de replicação igual a dois, ou seja, haverá 32768 blocos disponíveis no total. Suponha que usemos um cluster de 16 máquinas para armazenamento. Consequentemente, em cada uma das máquinas, no caso de distribuição uniforme, haverá aproximadamente 2048 blocos, ou 256 Gigabytes por máquina. Em cada uma das máquinas, temos 8 núcleos de processador e 64 gigabytes de RAM.

Para nossa tarefa, o driver não precisa de muitos recursos, portanto, alocaremos 1 núcleo e 1 GB de memória para ele. Daremos aos artistas 2 núcleos e 4 GB de memória. Suponha que desejamos maximizar o uso de recursos de cluster. Assim, temos 64 contêineres: um para o motorista e 63 para os artistas.

Figura 4. Processos em execução no nó de dados e os recursos que eles usam.

Como no nosso caso, usamos apenas operações de mapa, nosso DAG consistirá em uma operação. Consiste nas seguintes ações:

  1. pegue um bloco de dados do disco rígido local,
  2. Converter dados
  3. salve o resultado em um novo bloco no seu próprio disco local.

No total, precisamos processar 16384 blocos, para que cada executor execute 16384 / (63 executores * 2 núcleos) = 130 operações. Assim, o ciclo de vida do executor como um processo separado (no caso de tudo acontecer sem quedas) terá a seguinte aparência.

  1. Lançamento de contêiner.
  2. Receber do motorista uma tarefa na qual haverá um identificador de bloco e a operação necessária. Como alocamos dois núcleos para o contêiner, o executor recebe duas tarefas ao mesmo tempo.
  3. Executando uma tarefa e enviando o resultado ao motorista.
  4. Obtendo a próxima tarefa do driver e repetindo as etapas 2 e 3 até que todos os blocos desta máquina local sejam processados.
  5. Parada de contêiner

Nota : DAGs mais complexos são obtidos se for necessário redistribuir dados intermediários entre máquinas, geralmente para operações de agrupamento (groupBy, reduzemByKey etc.) e conexões (junção), cuja consideração está além do escopo deste artigo.

Os principais problemas de interação entre o Apache Spark e serviços externos


Se, dentro da estrutura da operação do mapa, precisarmos acessar algum serviço externo, a tarefa se tornará menos trivial. Suponha que um objeto da classe ExternalServiceClient seja responsável por interagir com um serviço externo. Em geral, antes de iniciar o trabalho, precisamos inicializá-lo e chamá-lo conforme necessário:

 val client = ExternalServiceClient.create() // val score = client.score(featureVector) // . 

Geralmente, a inicialização do cliente leva algum tempo; portanto, como regra, é inicializada na inicialização do aplicativo e, em seguida, é usada para obter uma instância do cliente de algum contexto ou pool global. Portanto, quando um contêiner com o executor Spark recebe uma tarefa que requer interação com um serviço externo, seria bom obter um cliente já inicializado antes de iniciar o trabalho na matriz de dados e reutilizá-lo para cada elemento.

Existem duas maneiras de fazer isso no Spark. Primeiro, se o cliente é serializável (o próprio cliente e todos os seus campos devem estender a interface java.io.Serializable), ele pode ser inicializado no driver e depois passado aos executores pelo mecanismo de variável de transmissão .

 val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) } 

Caso o cliente não seja serializável ou a inicialização do cliente seja um processo que depende das configurações da máquina específica na qual está sendo executado (por exemplo, para equilibrar, solicitações de uma parte das máquinas devem ir para a primeira máquina de serviço e, para a outra, para a segunda), então o cliente pode ser inicializado diretamente no executor.

Para fazer isso, o RDD (e o conjunto de dados) possui uma operação mapPartitions, que é uma versão generalizada da operação de mapa (se você observar o código-fonte da classe RDD, a operação de mapa será implementada por meio de mapPartitions). A função passada para a operação mapPartitions é executada uma vez para cada bloco. , , , :

 ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } } 

. , , , , . , , , .

. , hasNext next:

 while (i.hasNext()) { val item = i.next() … } 

, , . , 8 , YARN 4 2 , , 8 . , . .

. , , , , . : , , . , hasNext , . (, , ) , , , . , .

5. , , mapPartitions, . .

, , . , , , .

6.

, , , -, , , , -, , .


, . , . , . , . , , , , , , .

.

  1. , , , .
  2. , , . , . , .
  3. , hasNext false, , , , . : hasNext = false, , , . , , , .

, . Stay tuned!

Source: https://habr.com/ru/post/pt413137/


All Articles