Cassandra Sink para Spark Structured Streaming

Há alguns meses, comecei a estudar o Spark e, em algum momento, tive o problema de salvar os cálculos do Structured Streaming no banco de dados do Cassandra.

Neste post, dou um exemplo simples de criação e uso de Cassandra Sink para o Spark Structured Streaming. Espero que a publicação seja útil para aqueles que recentemente começaram a trabalhar com o Spark Structured Streaming e estão se perguntando como fazer upload dos resultados do cálculo no banco de dados.

A idéia do aplicativo é muito simples - receber e analisar mensagens do Kafka, realizar transformações simples em um par e salvar os resultados no cassandra.

Profissionais de streaming estruturado


Você pode ler mais sobre o Streaming estruturado na documentação . Em resumo, o Structured Streaming é um mecanismo de processamento de informações de streaming bem escalável, baseado no mecanismo Spark SQL. Ele permite que você use um conjunto de dados / DataFrame para agregar dados, calcular funções de janela, conexões etc. Ou seja, o Streaming estruturado permite que você use o bom e velho SQL para trabalhar com fluxos de dados.

Qual é o problema?


A versão estável do Spark Structured Streaming foi lançada em 2017. Ou seja, essa é uma API relativamente nova que implementa a funcionalidade básica, mas algumas coisas terão que ser feitas por nós mesmos. Por exemplo, o Structured Streaming possui funções padrão para gravar a saída em um arquivo, bloco, console ou memória, mas para salvar os dados no banco de dados, você deve usar o receptor foreach disponível no Structured Streaming e implementar a interface ForeachWriter . A partir do Spark 2.3.1, essa funcionalidade pode ser implementada apenas no Scala e Java .

Suponho que o leitor já saiba como o Streaming Estruturado funciona em termos gerais, saiba como implementar as transformações necessárias e agora esteja pronto para carregar os resultados no banco de dados. Se algumas das etapas acima não forem claras, a documentação oficial pode servir como um bom ponto de partida para aprender o Streaming Estruturado. Neste artigo, gostaria de focar na última etapa quando você precisar salvar os resultados em um banco de dados.

Abaixo, descreverei um exemplo de implementação do coletor Cassandra para o Structured Streaming e explicarei como executá-lo em um cluster. O código completo está disponível aqui .

Quando eu encontrei o problema acima, este projeto acabou sendo muito útil. No entanto, pode parecer um pouco complicado se o leitor acabou de começar a trabalhar com o Structured Streaming e está procurando um exemplo simples de como fazer upload de dados para o cassandra. Além disso, o projeto foi gravado para funcionar no modo local e requer algumas alterações para executar no cluster.

Também quero dar exemplos de como salvar dados no MongoDB e em qualquer outro banco de dados usando o JDBC .

Solução simples


Para fazer upload de dados para um sistema externo, você deve usar o receptor foreach . Leia mais sobre isso aqui . Em resumo, a interface ForeachWriter deve ser implementada. Ou seja, é necessário determinar como abrir a conexão, como processar cada parte dos dados e como fechar a conexão no final do processamento. O código fonte é o seguinte:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

Descreverei a definição de CassandraDriver e a estrutura da tabela de saída posteriormente, mas, por enquanto, vamos dar uma olhada em como o código acima funciona. Para conectar-se ao Kasandra a partir do Spark, crio um objeto CassandraDriver que fornece acesso ao CassandraConnector , um conector desenvolvido pela DataStax . O CassandraConnector é responsável por abrir e fechar a conexão com o banco de dados, portanto, apenas mostro mensagens de depuração nos métodos de abertura e fechamento da classe CassandraSinkForeach .

O código acima é chamado do aplicativo principal da seguinte maneira:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

CassandraSinkForeach é criado para cada linha de dados, portanto, cada nó de trabalho insere sua parte das linhas no banco de dados. Ou seja, cada nó de trabalho executa val cassandraDriver = new CassandraDriver (); É assim que o CassandraDriver se parece:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

Vamos dar uma olhada no objeto faísca . O código para SparkSessionBuilder é o seguinte:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

Em cada nó de trabalho, o SparkSessionBuilder fornece acesso ao SparkSession que foi criado no driver. Para tornar esse acesso possível, é necessário serializar o SparkSessionBuilder e usar um valor transitório preguiçoso , que permite ao sistema de serialização ignorar objetos conf e spark quando o programa é inicializado e até que os objetos sejam acessados. Portanto, quando o programa buildSparkSession é iniciado, ele é serializado e enviado para cada nó de trabalho, mas objetos conf e spark são permitidos apenas quando o nó de trabalho os acessa.

Agora vamos ver o código principal do aplicativo:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

Quando o aplicativo é enviado para execução, o buildSparkSession é serializado e enviado aos nós de trabalho, no entanto, os objetos conf e spark permanecem sem solução. Em seguida, o driver cria um objeto spark dentro do KafkaToCassandra e distribui o trabalho entre os nós em funcionamento. Cada nó de trabalho lê dados do Kafka, faz transformações simples na parte recebida dos registros e, quando o nó de trabalho está pronto para gravar os resultados no banco de dados, permite objetos conf e spark , obtendo acesso ao SparkSession criado no driver.

Como criar e executar o aplicativo?


Quando mudei do PySpark para o Scala, demorei um pouco para descobrir como criar o aplicativo. Portanto, incluí o Maven pom.xml no meu projeto. O leitor pode criar o aplicativo usando o Maven executando o comando mvn package . Depois que o aplicativo pode ser enviado para execução usando

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

Para criar e executar o aplicativo, é necessário substituir os nomes das minhas máquinas da AWS por suas próprias (ou seja, substituir tudo que se parece com ec2-xx-xxx-xx-xx.comxxpute-1.amazonaws.com).

O Spark e o Structured Streaming, em particular, são um novo tópico para mim, por isso serei muito grato aos leitores por comentários, discussões e correções.

Source: https://habr.com/ru/post/pt425503/


All Articles