Parte 2: Solução
Olá novamente! Hoje, continuarei minha história sobre como classificamos grandes quantidades de dados no Apache Spark usando modelos arbitrários de aprendizado de máquina. Na
primeira parte do artigo, examinamos a declaração do problema em si, bem como os principais problemas que surgem ao organizar a interação entre o cluster no qual os dados iniciais são armazenados e processados e o serviço de classificação externa. Na segunda parte, consideraremos uma das opções para resolver esse problema usando a abordagem de Fluxos Reativos e sua implementação usando a biblioteca akka-streams.
Conceito de fluxos reativos
Para resolver os problemas descritos na primeira parte, você pode usar a abordagem, chamada
Fluxos Reativos . Ele permite controlar o processo de transferência de fluxos de dados entre os estágios do processamento, operando em velocidades diferentes e independentemente um do outro, sem a necessidade de buffer. Se um dos estágios de processamento for mais lento que o anterior, será necessário sinalizar o estágio mais rápido sobre a quantidade de dados de entrada que está pronto para processar no momento. Essa interação é chamada de contrapressão. Consiste no fato de que os estágios mais rápidos processam exatamente quantos elementos são necessários para o estágio mais lento, e não mais, e liberam recursos de computação.
Em geral, o Reactive Streams é uma especificação para implementar o modelo
Publisher-Subscriber . Esta especificação define um conjunto de quatro interfaces (Publicador, Assinante, Processador e Assinatura) e um contrato para seus métodos.
Vamos considerar essas interfaces com mais detalhes:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Existem dois lados no modelo do Publisher-Subscriber: transmitir e receber. Ao implementar o Reactive Streams, a classe que implementa a interface do Publisher é responsável pela transferência de dados e o Assinante é responsável pelo recebimento. Para estabelecer uma comunicação entre eles, o Assinante deve ser registrado no Publisher chamando seu método de assinatura. De acordo com a especificação, após registrar um Assinante, o Publisher deve chamar seus métodos na seguinte ordem:
- onSubscribe. Esse método é chamado imediatamente após o registro do Assinante no Publisher. Como parâmetro, um objeto de Assinatura é passado a ele pelo qual o Assinante solicitará dados do Publisher. Este objeto deve ser armazenado e chamado apenas no contexto deste Assinante.
- Depois que o Assinante solicitou dados do Publisher chamando o método de solicitação no objeto de Assinatura correspondente, o Publisher pode chamar o método Subscriber onNext, passando o próximo elemento.
- O Assinante pode chamar periodicamente o método de solicitação na Assinatura, mas o Publisher não pode chamar o método onNext mais do que o total solicitado pelo método de solicitação.
- Se o fluxo de dados for finito, depois de passar todos os elementos pelo método onNext, o Publisher deverá chamar o método onComplete.
- Se um erro ocorreu no Publisher e o processamento posterior dos elementos não for possível, ele deverá chamar o método onError
- Após chamar os métodos onComplete ou onError, a interação adicional do Publisher com o Assinante deve ser excluída.
As chamadas de método podem ser consideradas como o envio de sinais entre o Publicador e o Assinante. O Assinante sinaliza ao Publisher quantos elementos ele está pronto para processar e o Publisher, por sua vez, sinaliza para ele que existe o próximo elemento, ou que não há mais elementos ou que ocorreu algum erro.
Para excluir outra influência do Publicador e do Assinante, as chamadas para todos os métodos que implementam interfaces do Reactive Streams devem ser não-bloqueantes. Nesse caso, a interação entre eles será completamente assíncrona.
Mais detalhes sobre a especificação para interfaces de fluxos reativos podem ser encontrados
aqui .
Assim, vinculando os iteradores originais e resultantes através da conversão para o Publisher e o Subscriber, respectivamente, podemos resolver os problemas identificados na parte anterior do artigo. O problema do estouro de buffer entre os estágios é resolvido solicitando um certo número de elementos pelo Assinante. O problema da conclusão com ou sem êxito é resolvido enviando sinais para o Assinante através dos métodos onComplete ou onError, respectivamente. O Publisher se torna responsável pelo envio desses sinais, que no nosso caso devem controlar quantas solicitações HTTP foram enviadas e quantas delas receberam respostas. Depois de receber a última resposta e processar todos os resultados que vierem nela, ele deverá enviar um sinal onComplete. Caso uma das solicitações falhe, ela deve enviar um sinal onError e parar de enviar outros elementos para o Assinante, além de subtrair elementos do iterador original.
O iterador resultante deve ser implementado como um Assinante. Nesse caso, não podemos ficar sem um buffer no qual os elementos serão gravados quando o método onNext for chamado da interface do Assinante e subtraído usando os métodos hasNext e next da interface do Iterator. Como uma implementação de buffer, você pode usar uma fila de bloqueio, por exemplo, LinkedBlockedQueue.
Um leitor atento fará a pergunta imediatamente: por que a fila de bloqueio é porque, de acordo com a especificação de Fluxos Reativos, a implementação de todos os métodos deve ser sem bloqueio? Mas está tudo bem aqui: como estamos solicitando ao Publisher um número estritamente definido de elementos, o método onNext será chamado não mais que esse número de vezes, e a fila sempre poderá adicionar um novo elemento sem bloquear.
Por outro lado, o bloqueio pode ocorrer quando o método hasNext é chamado no caso de uma fila vazia. No entanto, está tudo bem: o método hasNext não faz parte do contrato da interface do Assinante, é definido na interface do Iterator, que, como explicamos anteriormente, é uma estrutura de dados de bloqueio. Ao chamar o próximo método, subtraímos o próximo elemento da fila e, quando seu tamanho se torna menor que um determinado limite, precisaremos solicitar a próxima parte dos elementos através de uma chamada para o método de solicitação.
Figura 7. Interação assíncrona com um serviço externo usando a abordagem Reactive StreamsObviamente, nesse caso, não vamos nos livrar completamente do bloqueio de chamadas. Isso é causado por uma incompatibilidade de paradigmas entre fluxos Reativos, que assumem interação completamente assíncrona, e um iterador, que deve chamar trueN ou false ao chamar o método hasNext. No entanto, diferentemente da interação síncrona com um serviço externo, o tempo de inatividade devido a bloqueios pode ser reduzido significativamente, aumentando a carga geral dos núcleos do processador.
Seria conveniente se os desenvolvedores do Apache Spark em versões futuras implementassem um análogo do método mapPartitions, que funciona com o Publisher e o Subscriber. Isso permitiria uma interação completamente assíncrona, eliminando assim a possibilidade de bloquear threads.
Akka-streams e akka-http como uma implementação da especificação Reactive Streams
Atualmente, já existem mais de uma dúzia de implementações da especificação Reactive Streams. Uma dessas implementações é o módulo akka-streams da biblioteca
akka . No mundo da JVM, a akka se estabeleceu como um dos meios mais eficazes para escrever sistemas paralelos e distribuídos. Isso é alcançado devido ao fato de que o princípio básico estabelecido em sua base é
o modelo de ator , que permite escrever aplicativos altamente competitivos sem controle direto dos encadeamentos e de seus pools.
Muita literatura foi escrita sobre a implementação do conceito de atores em akka, então não vamos parar por aqui (o
site oficial da
akka é uma fonte muito boa de informações, eu também recomendo o
akka no livro de
ação ). Aqui, examinaremos mais de perto o lado tecnológico da implementação sob a JVM.
Em geral, os atores não existem por si mesmos, mas formam um sistema hierárquico. Para criar um sistema ator, você precisa alocar recursos para ele. Portanto, a primeira etapa ao trabalhar com akka é criar uma instância do objeto ActorSystem. Quando o ActorSystem é iniciado, é criado um pool de encadeamentos separado, chamado expedidor, no qual todo o código definido nos atores é executado. Normalmente, um único encadeamento executa o código de vários atores; no entanto, se necessário, você pode configurar um expedidor separado para um grupo específico de atores (por exemplo, para atores interagindo diretamente com uma API de bloqueio).
Uma das tarefas mais comuns resolvidas usando atores é o processamento seqüencial de fluxos de dados. Anteriormente, para isso, era necessário criar cadeias de atores manualmente e garantir que não houvesse gargalos entre eles (por exemplo, se um ator processar mensagens mais rapidamente que o seguinte, ele poderá ter um estouro da fila de mensagens recebidas, levando a um erro OutOfMemoryError).
A partir da versão 2.4, o módulo akka-streams foi adicionado ao akka, o que permite definir declarativamente o processo de processamento de dados e, em seguida, criar os atores necessários para sua execução. O Akka-streams também implementa o princípio da contrapressão, que elimina a possibilidade de transbordar a fila de mensagens recebidas para todos os atores envolvidos no processamento.
Os principais elementos para definir o esquema de processamento de fluxo de dados nos akka-streams são Source, Flow e Sink. Ao combiná-los, obtemos um Gráfico Executável. Para iniciar o processo de processamento, é usado um materializador, que cria atores trabalhando de acordo com o gráfico definido por nós (a interface do Materializer e sua implementação ActorMaterializer).
Vamos considerar os estágios Fonte, Fluxo e Pia com mais detalhes. Fonte define a fonte de dados. O Akka-streams suporta mais de uma dúzia de maneiras diferentes de criar fontes, incluindo a partir de um iterador:
val featuresSource: Source[Array[Float], NotUsed] = Source.fromIterator { () => featuresIterator }
A fonte também pode ser obtida convertendo uma fonte existente:
val newSource: Source[String, NotUsed] = source.map(item => transform(item))
Se a transformação for uma operação não trivial, poderá ser representada como uma entidade de Fluxo. O Akka-streams suporta muitas maneiras diferentes de criar o Flow. A maneira mais fácil é criar a partir de uma função:
val someFlow: Flow[String, Int, NotUsed] = Flow.fromFunction((x: String) => x.length)
Combinando Fonte e Fluxo, obtemos uma nova Fonte.
val newSource: Source[Int, NotUsed] = oldSource.via(someFlow)
O coletor é usado como o estágio final do processamento de dados. Como no caso do Source, o akka-streams fornece mais de uma dúzia de opções diferentes de coletor, por exemplo, o Sink.foreach executa uma determinada operação para cada elemento, o Sink.seq coleta todos os elementos em uma coleção, etc.
val printSink: Sink[Any, Future[Done]] = Sink.foreach(println)
Source, Flow e Sink são parametrizados pelos tipos de elementos de entrada e / ou saída, respectivamente. Além disso, cada estágio do processamento pode ter algum resultado do seu trabalho. Para isso, Source, Flow e Sink também são parametrizados por um tipo adicional que determina o resultado da operação. Esse tipo é chamado de tipo de valor materializado. Se a operação não implica a presença de um resultado adicional de seu trabalho, por exemplo, quando definimos o Flow por meio de uma função, o tipo NotUsed é usado como o valor materializado.
Combinando a fonte, o fluxo e o coletor necessários, obtemos o RunnableGraph. É parametrizado por um tipo, que determina o tipo de valor obtido como resultado da execução deste gráfico. Se necessário, ao combinar os estágios, é possível especificar o resultado de qual dos estágios será o resultado de todo o gráfico de operações. Por padrão, o resultado do estágio Origem é obtido:
val graph: RunnableGraph[NotUsed] = someSource.to(Sink.foreach(println))
No entanto, se o resultado do estágio Sink for mais importante para nós, devemos indicar explicitamente isso:
val graph: RunnableGraph[Future[Done]] = someSource.toMat(Sink.foreach(println))(Keep.right)
Depois de definirmos o gráfico de operações, devemos executá-lo. Para fazer isso, runnableGraph precisa chamar o método run. Como parâmetro, esse método utiliza um objeto ActorMaterializer (que também pode estar em um escopo implícito), responsável pela criação de atores que executam operações. Normalmente, um ActorMaterializer é criado imediatamente após a criação de um ActorSystem, anexado ao seu ciclo de vida, e o utiliza para criar atores. Considere um exemplo:
No caso de combinações simples, você pode fazer isso sem criar um RunnableGraph separado, mas simplesmente conectar o Source ao Sink e iniciá-los chamando o método runWith no Source. Este método também pressupõe que um objeto ActorMaterializer esteja presente no escopo implícito. Além disso, neste caso, será utilizado o valor materializado definido no Sink. Por exemplo, usando o código a seguir, podemos converter Origem em Publisher da especificação de Fluxos Reativos:
val source: Source[Score, NotUsed] = Source.fromIterator(() => sourceIterator).map(item => transform(item)) val publisher: Publisher[Score] = source.runWith(Sink.asPublisher(false))
Portanto, agora mostramos como você pode obter o Reactive Streams Publisher criando uma Origem a partir do iterador de origem e realizando algumas transformações em seus elementos. Agora podemos associá-lo a um Assinante que fornece dados ao iterador resultante. Resta considerar a última pergunta: como organizar a interação HTTP com um serviço externo.
A estrutura do akka inclui o módulo
akka-http , que permite organizar a comunicação assíncrona e sem bloqueio por HTTP. Além disso, este módulo é construído com base nos fluxos akka, que permitem adicionar a interação HTTP como uma etapa adicional no gráfico das operações de processamento de fluxo de dados.
Para conectar-se a serviços externos, o akka-http fornece três interfaces diferentes.
- API em nível de solicitação - é a opção mais simples para solicitações únicas a uma máquina arbitrária. Nesse nível, as conexões HTTP são gerenciadas completamente automaticamente e, em cada solicitação, é necessário transferir o endereço completo da máquina para a qual a solicitação está indo.
- API em nível de host - adequada quando sabemos qual porta em qual máquina estaremos acessando. Nesse caso, o akka-http assume o controle do conjunto de conexões HTTP e, em solicitações, basta especificar o caminho relativo para o recurso solicitado.
- API no nível da conexão - permite obter controle total sobre o gerenciamento de conexões HTTP, ou seja, abrir, fechar e distribuir solicitações nas conexões.
No nosso caso, o endereço do serviço de classificação é conhecido por nós com antecedência, portanto, é necessário organizar a interação HTTP somente com esta máquina específica. Portanto, a API em nível de host é melhor para nós. Agora, vamos ver como o pool de conexões HTTP é criado ao usá-lo:
val httpFlow: Flow[(HttpRequest,Id), (Try[HttpResponse],Id), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Id](hostAddress, portNumber)
Ao chamar Http (). CachedHostConnectionPool [T] (hostAddress, portNumber) no ActorSystem, que está em um escopo implícito, os recursos são alocados para criar um pool de conexões, mas as conexões em si não são estabelecidas. Como resultado dessa chamada, o Flow é retornado, que recebe um par de uma solicitação HTTP e algum objeto de identificação de ID como entrada. O objeto de identificação é necessário para corresponder a solicitação com a resposta correspondente devido ao fato de a chamada HTTP no akka-http ser uma operação assíncrona e a ordem na qual as respostas são recebidas não corresponde necessariamente à ordem na qual as solicitações são enviadas. Portanto, na saída, o Flow fornece alguns resultados da consulta e o objeto de identificação correspondente.
Diretamente, as conexões HTTP são estabelecidas quando um gráfico (incluindo este fluxo) é iniciado (materializado). O Akka-http é implementado de forma que, não importa quantas vezes os gráficos que contêm o httpFlow tenham sido materializados, em um ActorSystem sempre haverá um pool comum de conexões HTTP que será usado por todas as materializações. Isso permite que você controle melhor o uso dos recursos de rede e evite sobrecarregá-los.
Portanto, o ciclo de vida do conjunto de conexões HTTP está vinculado ao ActorSystem. Como já mencionado, o ciclo de vida do conjunto de encadeamentos também é anexado a ele, no qual as operações definidas nos atores são executadas (ou, no nosso caso, definidas como os estágios akka-streams e akka-http). Portanto, para obter a máxima eficiência, precisamos reutilizar uma instância do ActorSystem no mesmo processo da JVM.
Juntando tudo isso: um exemplo de implementação de interação com o serviço de classificação
Portanto, agora podemos avançar para o processo de classificação de grandes volumes de dados distribuídos no Apache Spark usando interação assíncrona com serviços externos. O esquema geral dessa interação já foi mostrado na Figura 7.
Suponha que tenhamos alguns conjuntos de dados [Recursos] definidos. Aplicando a operação mapPartitions a ele, devemos obter um Dataset, no qual cada ID do conjunto de origem é carimbado com um determinado valor obtido como resultado da classificação (Dataset [Score]). Para organizar o processamento assíncrono nos executores, devemos agrupar a fonte e os iteradores resultantes no Publisher e Subscriber, respectivamente, da especificação de Fluxos reativos e vinculá-los.
case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) val batchesRequestCount = config.getInt(“scoreService. batchesRequestCount”)
Nesta implementação, é levado em consideração que o serviço de classificação de uma chamada pode processar um grupo de vetores de recursos de uma só vez; portanto, o resultado da classificação após uma chamada também estará disponível imediatamente para todo o grupo. Portanto, como um tipo de parâmetro para o Publisher, não temos apenas a Pontuação, como seria de esperar, mas também Iterável [Pontuação]. Assim, enviamos os resultados da classificação para esse grupo ao iterador resultante (que também é um Assinante) por uma única chamada ao método onNext. Isso é muito mais eficiente do que chamar onNext para cada elemento. Agora vamos analisar esse código com mais detalhes.
- Determinamos a estrutura dos dados de entrada e saída. Como entrada, teremos um monte de algum identificador de identificação com um vetor de recurso e, como saída, teremos um monte de identificador com um valor numérico obtido como resultado da classificação.
- Determinamos o número de grupos que o Assinante solicitará ao Publicador por vez. Como supõe-se que esses valores estejam no buffer e esperem até serem lidos no iterador resultante, esse valor depende da quantidade de memória alocada ao executor.
- Crie o Publisher a partir do iterador de origem. Ele será responsável por interagir com o serviço de classificação. A função createPublisher é discutida abaixo.
- Crie um Assinante, que será o iterador resultante. O código da classe IteratorSubscriber também é fornecido abaixo.
- Registrando o Assinante no Publisher.
- Retorne IteratorSubscriber como resultado da operação mapPartitions.
Agora considere a implementação da função createPublisher.
type Ids = Seq[String]
- - , . httpFlow, .
- : , (batchSize) (parallelismLevel).
- implicit scope ActorSystem, ActorMaterializer httpFlow. Spark-. ActorSystemHolder .
- akka-streams . Source[Features] .
- batchSize .
- HttpRequest . HttpRequest createHttpRequest. createPublisher. feature-, , ( predict). , HTTP-. , HTTP-, HTTP-, URI .
- httpFlow.
- , . flatMapMerge, akka-http Source[ByteString], , . . parallelismLevel , ( ). HTTP-: , , , .
- : . akka ByteString. , ByteString O(1), ByteString . , , . , .
- HTTP- , Stream . , discardEntityBytes , , .
- . akka-http , .
- , Publisher, . , . false Sink.asPublisher , Publisher Subscriber-.
, akka ActorSystem, . , Spark , . Spark JVM , , , ActorSystem ActorMatrializer httpFlow.
object ActorSystemHolder { implicit lazy val actorSystem: ActorSystem = {
- , , , .
- ActorSystem .
- , , ActorSystem, terminate, , , , . , JVM-.
- ActorMaterializer, akka-streams, ActorSystem.
- , httpFlow . , HTTP- ActorSystem.
Subscriber- HTTP-.
sealed trait QueueItem[+T] case class Item[+T](item: T) extends QueueItem[T] case object Done extends QueueItem[Nothing] case class Failure(cause: Throwable) extends QueueItem[Nothing] //(1) class StreamErrorCompletionException(cause: Throwable) extends Exception(cause) //(2) class IteratorSubscriber[T](requestSize: Int) extends Subscriber[Iterable[T]] with Iterator[T] {
IteratorSubscriber Producer-Consumer. , Subscriber, Producer-, , Iterator, – Consumer-. , . Iterator Apache Spark, Subscriber – , ActorSystem.
IteratorSubscriber .
- . , Done, , Throwable, .
- , hasNext .
- , , Publisher-.
- , . LinkedBlockingQueue, . , .
- , . , , Publisher-. , , Publisher- . hasNext next ( requestNextBatches hasNext), , .
- subscriptionPromise subscription Subscription, Publisher onSubscribe. , Reactive Streams Subscriber- Publisher- , , hasNext , onSubscribe. , subscription, Publisher-. lazy subscription, Promise.
- . hasNext next, , .
- , , hasNext false . hasNext, .
- onSubscribe Publisher- Subscription Promise, subscription.
- onNext Publisher-, . .
- Publisher onComplete, Done.
- Publisher onError. .
- hasNext , . , true, . , .
- , false.
- , , requestSize, Publisher. , , , Publisher- , HTTP- .
- . , , , . , , ( , , subscription), , , , .
- , currentIterator. , . , hasNext , ( , ), .
- , false hasNext. , isDone, , . - , hasNext , false. , hasNext , false , . , .
- , , , .
- next . , hasNext, next .
- Publisher- , , subscription, Publisher-. requestSize. .
, , , :
8. .:
, , . , HTTP , . .
– . , , Hadoop , . , , - . , , hdfs, , , , .
, . , akka-http , . , -, - Apache Spark , , , -.
, , . , , http-, , .
, . , , . , . , .
, . , , Hadoop , , .
, , Hadoop- , , .
, ,
CleverDATA . . , , , , , . , .