Há alguns meses, comecei a estudar o Spark e, em algum momento, tive o problema de salvar os cálculos do Structured Streaming no banco de dados do Cassandra.
Neste post, dou um exemplo simples de criação e uso de Cassandra Sink para o Spark Structured Streaming. Espero que a publicação seja útil para aqueles que recentemente começaram a trabalhar com o Spark Structured Streaming e estão se perguntando como fazer upload dos resultados do cálculo no banco de dados.
A idéia do aplicativo é muito simples - receber e analisar mensagens do Kafka, realizar transformações simples em um par e salvar os resultados no cassandra.
Profissionais de streaming estruturado
Você pode ler mais sobre o Streaming estruturado na
documentação . Em resumo, o Structured Streaming é um mecanismo de processamento de informações de streaming bem escalável, baseado no mecanismo Spark SQL. Ele permite que você use um conjunto de dados / DataFrame para agregar dados, calcular funções de janela, conexões etc. Ou seja, o Streaming estruturado permite que você use o bom e velho SQL para trabalhar com fluxos de dados.
Qual é o problema?
A versão estável do Spark Structured Streaming foi lançada em 2017. Ou seja, essa é uma API relativamente nova que implementa a funcionalidade básica, mas algumas coisas terão que ser feitas por nós mesmos. Por exemplo, o Structured Streaming possui funções padrão para gravar a saída em um arquivo, bloco, console ou memória, mas para salvar os dados no banco de dados, você deve usar o receptor
foreach disponível no Structured Streaming e implementar a interface
ForeachWriter .
A partir do Spark 2.3.1, essa funcionalidade pode ser implementada apenas no Scala e Java .
Suponho que o leitor já saiba como o Streaming Estruturado funciona em termos gerais, saiba como implementar as transformações necessárias e agora esteja pronto para carregar os resultados no banco de dados. Se algumas das etapas acima não forem claras, a documentação oficial pode servir como um bom ponto de partida para aprender o Streaming Estruturado. Neste artigo, gostaria de focar na última etapa quando você precisar salvar os resultados em um banco de dados.
Abaixo, descreverei um exemplo de implementação do coletor Cassandra para o Structured Streaming e explicarei como executá-lo em um cluster. O código completo está disponível
aqui .
Quando eu encontrei o problema acima,
este projeto acabou sendo muito útil. No entanto, pode parecer um pouco complicado se o leitor acabou de começar a trabalhar com o Structured Streaming e está procurando um exemplo simples de como fazer upload de dados para o cassandra. Além disso, o projeto foi gravado para funcionar no modo local e requer algumas alterações para executar no cluster.
Também quero dar exemplos de como salvar dados no
MongoDB e em qualquer outro banco de dados usando o
JDBC .
Solução simples
Para fazer upload de dados para um sistema externo, você deve usar o receptor
foreach . Leia mais sobre isso
aqui . Em resumo, a interface
ForeachWriter deve ser implementada. Ou seja, é necessário determinar como abrir a conexão, como processar cada parte dos dados e como fechar a conexão no final do processamento. O código fonte é o seguinte:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
Descreverei a definição de
CassandraDriver e a estrutura da tabela de saída posteriormente, mas, por enquanto, vamos dar uma olhada em como o código acima funciona. Para conectar-se ao Kasandra a partir do Spark, crio um objeto
CassandraDriver que fornece acesso ao
CassandraConnector , um conector desenvolvido pela
DataStax . O CassandraConnector é responsável por abrir e fechar a conexão com o banco de dados, portanto, apenas mostro mensagens de depuração nos métodos de
abertura e
fechamento da classe
CassandraSinkForeach .
O código acima é chamado do aplicativo principal da seguinte maneira:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
CassandraSinkForeach é criado para cada linha de dados, portanto, cada nó de trabalho insere sua parte das linhas no banco de dados. Ou seja, cada nó de trabalho executa
val cassandraDriver = new CassandraDriver (); É assim que o CassandraDriver se parece:
class CassandraDriver extends SparkSessionBuilder {
Vamos dar uma olhada no objeto
faísca . O código para
SparkSessionBuilder é o seguinte:
class SparkSessionBuilder extends Serializable {
Em cada nó de trabalho, o
SparkSessionBuilder fornece acesso ao
SparkSession que foi criado no driver. Para tornar esse acesso possível, é necessário serializar o
SparkSessionBuilder e usar um valor
transitório preguiçoso , que permite ao sistema de serialização ignorar objetos
conf e
spark quando o programa é inicializado e até que os objetos sejam acessados. Portanto, quando o programa
buildSparkSession é
iniciado, ele é serializado e enviado para cada nó de trabalho, mas objetos
conf e
spark são permitidos apenas quando o nó de trabalho os acessa.
Agora vamos ver o código principal do aplicativo:
object KafkaToCassandra extends SparkSessionBuilder {
Quando o aplicativo é enviado para execução, o
buildSparkSession é serializado e enviado aos nós de trabalho, no entanto, os objetos
conf e
spark permanecem sem solução. Em seguida, o driver cria um objeto spark dentro do
KafkaToCassandra e distribui o trabalho entre os nós em funcionamento. Cada nó de trabalho lê dados do Kafka, faz transformações simples na parte recebida dos registros e, quando o nó de trabalho está pronto para gravar os resultados no banco de dados, permite objetos
conf e
spark , obtendo acesso ao
SparkSession criado no driver.
Como criar e executar o aplicativo?
Quando mudei do PySpark para o Scala, demorei um pouco para descobrir como criar o aplicativo. Portanto, incluí o Maven
pom.xml no meu projeto. O leitor pode criar o aplicativo usando o Maven executando o comando
mvn package . Depois que o aplicativo pode ser enviado para execução usando
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
Para criar e executar o aplicativo, é necessário substituir os nomes das minhas máquinas da AWS por suas próprias (ou seja, substituir tudo que se parece com ec2-xx-xxx-xx-xx.comxxpute-1.amazonaws.com).
O Spark e o Structured Streaming, em particular, são um novo tópico para mim, por isso serei muito grato aos leitores por comentários, discussões e correções.