Apache Kafka y Streaming con Spark Streaming

Hola Habr! Hoy crearemos un sistema que utilizará Apark Kafka para procesar secuencias de mensajes utilizando Spark Streaming y escribir el resultado del procesamiento en la base de datos en la nube de AWS RDS.

Imagine que cierta institución de crédito nos ha asignado la tarea de procesar transacciones entrantes sobre la marcha en todas sus sucursales. Esto se puede hacer para calcular rápidamente la posición de moneda abierta para la tesorería, límites o resultados financieros para transacciones, etc.

Cómo implementar este caso sin el uso de magia y hechizos mágicos: ¡leemos debajo del corte! Vamos!


(Fuente de la imagen)

Introduccion


Por supuesto, el procesamiento de una gran matriz de datos en tiempo real ofrece amplias oportunidades para su uso en sistemas modernos. Una de las combinaciones más populares para esto es el tándem Apache Kafka y Spark Streaming, donde Kafka crea un flujo de paquetes de mensajes entrantes, y Spark Streaming procesa estos paquetes en un intervalo de tiempo específico.

Para aumentar la tolerancia a fallas de la aplicación, utilizaremos puntos de control - puntos de control. Usando este mecanismo, cuando el módulo Spark Streaming necesita recuperar datos perdidos, solo necesita regresar al último punto de control y reanudar los cálculos a partir de él.

Arquitectura del sistema en desarrollo.




Componentes utilizados:

  • Apache Kafka es un sistema de mensajería distribuido con publicación y suscripción. Adecuado para el consumo de mensajes fuera de línea y en línea. Para evitar la pérdida de datos, los mensajes de Kafka se almacenan en el disco y se replican dentro del clúster. El sistema Kafka está construido sobre el servicio de sincronización ZooKeeper;
  • Apache Spark Streaming : componente de Spark para procesar datos de transmisión. El módulo Spark Streaming se construye utilizando la arquitectura de micro lotes, cuando el flujo de datos se interpreta como una secuencia continua de pequeños paquetes de datos. Spark Streaming recibe datos de varias fuentes y los combina en pequeños paquetes. Se crean nuevos paquetes a intervalos regulares. Al comienzo de cada intervalo de tiempo, se crea un nuevo paquete y cualquier dato recibido durante este intervalo se incluye en el paquete. Al final del intervalo, el crecimiento del paquete se detiene. El tamaño del intervalo está determinado por un parámetro llamado intervalo de lote;
  • Apache Spark SQL : combina el procesamiento relacional con la programación funcional de Spark. Los datos estructurados se refieren a datos que tienen un esquema, es decir, un conjunto único de campos para todos los registros. Spark SQL admite la entrada de una variedad de fuentes de datos estructurados y, gracias a la disponibilidad de información sobre el esquema, puede recuperar de manera eficiente solo los campos de registro requeridos, y también proporciona las API de DataFrame;
  • AWS RDS es una base de datos relacional basada en la nube relativamente económica, un servicio web que simplifica la configuración, el funcionamiento y el escalado, y que Amazon administra directamente.

Instalar e iniciar el servidor Kafka


Antes de usar Kafka directamente, debe asegurarse de que Java esté disponible, ya que JVM se usa para el trabajo:

sudo apt-get update sudo apt-get install default-jre java -version 

Cree un nuevo usuario para trabajar con Kafka:

 sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo 

A continuación, descargue la distribución del sitio web oficial de Apache Kafka:

 wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz" 

Descomprima el archivo descargado:
 tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka 

El siguiente paso es opcional. El hecho es que la configuración predeterminada no permite el uso completo de todas las características de Apache Kafka. Por ejemplo, elimine un tema, categoría, grupo en el que se pueden publicar mensajes. Para cambiar esto, edite el archivo de configuración:

 vim ~/kafka/config/server.properties 

Agregue lo siguiente al final del archivo:

 delete.topic.enable = true 

Antes de iniciar el servidor Kafka, debe iniciar el servidor ZooKeeper, utilizaremos el script auxiliar que viene con la distribución Kafka:

 Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties 

Después de que ZooKeeper se inició con éxito, en una terminal separada lanzamos el servidor Kafka:

 bin/kafka-server-start.sh config/server.properties 

Cree un nuevo tema llamado Transacción:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction 

Asegúrese de que se haya creado el tema con el número correcto de particiones y replicación:

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 



Echaremos de menos los momentos de probar al productor y al consumidor para el tema recién creado. Para obtener más detalles sobre cómo probar el envío y la recepción de mensajes, consulte la documentación oficial: envíe algunos mensajes . Bueno, pasamos a escribir un productor en Python usando la API KafkaProducer.

Escritor de Productor


El productor generará datos aleatorios: 100 mensajes por segundo. Por datos aleatorios nos referimos a un diccionario que consta de tres campos:

  • Sucursal : nombre del punto de venta de la entidad de crédito;
  • Moneda - moneda de transacción;
  • Monto - monto de la transacción. El monto será un número positivo si se trata de una compra de divisas por parte del Banco, y negativo si se trata de una venta.

El código para el productor es el siguiente:

 from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict 

A continuación, utilizando el método de envío, enviamos un mensaje al servidor, en el tema que necesitamos, en formato JSON:

 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.format(e)) finally: producer.flush() 

Cuando ejecutamos el script, recibimos los siguientes mensajes en el terminal:


Esto significa que todo funciona como queríamos: el productor genera y envía mensajes al tema que necesitamos.

El siguiente paso es instalar Spark y procesar este flujo de mensajes.

Instalar Apache Spark


Apache Spark es una plataforma de computación en clúster versátil y de alto rendimiento.

En términos de rendimiento, Spark supera las implementaciones populares del modelo MapReduce, proporcionando simultáneamente soporte para una gama más amplia de tipos de cálculos, incluidas consultas interactivas y procesamiento de flujo. La velocidad juega un papel importante en el procesamiento de grandes cantidades de datos, ya que es la velocidad la que le permite trabajar de manera interactiva sin perder minutos u horas de espera. Una de las mayores fortalezas de Spark a una velocidad tan alta es su capacidad para realizar cálculos en memoria.

Este marco está escrito en Scala, por lo que debe instalarlo primero:

 sudo apt-get install scala 

Descargue la distribución de Spark del sitio web oficial:

 wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz" 

Descomprima el archivo:

 sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark 

Agregue la ruta al Spark en el archivo bash:

 vim ~/.bashrc 

Agregue las siguientes líneas a través del editor:

 SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH 

Ejecute el siguiente comando después de realizar cambios en bashrc:

 source ~/.bashrc 

Implementación de AWS PostgreSQL


Queda por implementar la base de datos, donde cargaremos la información procesada de las transmisiones. Para esto utilizaremos el servicio AWS RDS.

Vaya a la consola AWS -> AWS RDS -> Bases de datos -> Crear base de datos:


Seleccione PostgreSQL y haga clic en el botón Siguiente:


Porque Este ejemplo se entiende únicamente con fines educativos, utilizaremos un servidor gratuito "como mínimo" (Nivel gratuito):


A continuación, marque el bloque Free Tier, y luego se nos ofrecerá automáticamente una instancia de la clase t2.micro, aunque débil, es gratis y bastante adecuada para nuestra tarea:

Siguen cosas muy importantes: el nombre de la instancia de la base de datos, el nombre del usuario maestro y su contraseña. Pongamos nombre a la instancia: myHabrTest, el usuario maestro: habr , la contraseña: habr12345 y haga clic en el botón Siguiente:



La siguiente página contiene los parámetros responsables de la disponibilidad de nuestro servidor de bases de datos desde el exterior (Accesibilidad pública) y la disponibilidad de puertos:


Creemos una nueva configuración para el grupo de seguridad VPC, que nos permitirá acceder a nuestro servidor de bases de datos desde el exterior a través del puerto 5432 (PostgreSQL).

En una ventana separada del navegador, vaya a la consola de AWS en el Panel de VPC -> Grupos de seguridad -> sección Crear grupo de seguridad:

Establezca el nombre para el grupo de Seguridad: PostgreSQL, una descripción, indique a qué VPC debe asociarse este grupo y haga clic en el botón Crear:


Complete el grupo de reglas de entrada recién creado para el puerto 5432, como se muestra en la imagen a continuación. No tiene que especificar un puerto manual, pero seleccione PostgreSQL de la lista desplegable Tipo.

Hablando estrictamente, el valor :: / 0 significa la disponibilidad de tráfico entrante para un servidor de todo el mundo, lo que canónicamente no es del todo cierto, pero para analizar el ejemplo, usemos este enfoque:


Regresamos a la página del navegador, donde tenemos abierto "Configurar opciones avanzadas" y seleccionamos en la sección Grupos de seguridad de VPC -> Elegir grupos de seguridad de VPC existentes -> PostgreSQL:


A continuación, en la sección Opciones de la base de datos -> Nombre de la base de datos -> establezca el nombre - habrDB .

Podemos dejar el resto de los parámetros, con la excepción de deshabilitar la copia de seguridad (período de retención de la copia de seguridad - 0 días), la supervisión y las Estadísticas de rendimiento, de forma predeterminada. Haga clic en el botón Crear base de datos :


Controlador de flujo


El paso final será el desarrollo de Spark-jobs, que procesará cada dos segundos nuevos datos procedentes de Kafka e ingresará el resultado en la base de datos.

Como se señaló anteriormente, los puntos de control son el mecanismo principal en SparkStreaming que debe configurarse para proporcionar tolerancia a fallas. Usaremos puntos de control y, en caso de una caída del procedimiento, el módulo Spark Streaming solo necesitará regresar al último punto de control y reanudar los cálculos para restaurar los datos perdidos.

Puede habilitar el punto de interrupción configurando el directorio en un sistema de archivos confiable y tolerante a fallas (por ejemplo, HDFS, S3, etc.), en el que se guardará la información del punto de interrupción. Esto se hace usando, por ejemplo:

 streamingContext.checkpoint(checkpointDirectory) 

En nuestro ejemplo, utilizaremos el siguiente enfoque, es decir, si checkpointDirectory existe, entonces el contexto se recreará a partir de los datos del punto de control. Si el directorio no existe (es decir, se ejecuta por primera vez), se llama a la función functionToCreateContext para crear un nuevo contexto y configurar DStreams:

 from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) 

Cree un objeto DirectStream para conectarse al tema "transacción" utilizando el método createDirectStream de la biblioteca KafkaUtils:

 from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list}) 

Análisis de datos entrantes en formato JSON:

 rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream") 

Usando Spark SQL, hacemos una agrupación simple e imprimimos el resultado en la consola:

 select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency 

Obtener texto de consulta y ejecutarlo a través de Spark SQL:

 sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5) 

Y luego guardamos los datos agregados recibidos en una tabla en AWS RDS. Para guardar los resultados de la agregación en una tabla de base de datos, utilizaremos el método de escritura del objeto DataFrame:

 testResultDataFrame.write \ .format("jdbc") \ .mode("append") \ .option("driver", 'org.postgresql.Driver') \ .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") \ .option("dbtable", "transaction_flow") \ .option("user", "habr") \ .option("password", "habr12345") \ .save() 

Algunas palabras sobre la configuración de una conexión a AWS RDS. Creamos el usuario y la contraseña en el paso "Implementación de AWS PostgreSQL". Para la URL del servidor de la base de datos, use Endpoint, que se muestra en la sección Conectividad y seguridad:


Para conectar correctamente Spark y Kafka, debe ejecutar el trabajo mediante smark-submit utilizando el artefacto spark-streaming-kafka-0-8_2.11 . Además, también aplicamos el artefacto para interactuar con la base de datos PostgreSQL, los transferiremos a través de --packages.

Para la flexibilidad de la secuencia de comandos, también sacamos el nombre del servidor de mensajes y el tema del que queremos recibir datos como parámetros de entrada.

Entonces, es hora de comenzar y probar el sistema:

 spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction 

¡Todo salió bien! Como puede ver en la imagen a continuación, durante el trabajo de la aplicación, se muestran nuevos resultados de agregación cada 2 segundos, porque establecemos el intervalo de procesamiento por lotes en 2 segundos al crear el objeto StreamingContext:


A continuación, hacemos una consulta simple a la base de datos para verificar los registros en la tabla transaction_flow :


Conclusión


Este artículo examinó un ejemplo de procesamiento de transmisión de información usando Spark Streaming junto con Apache Kafka y PostgreSQL. Con el crecimiento de los datos de varias fuentes, es difícil sobreestimar el valor práctico de Spark Streaming para crear aplicaciones de transmisión y aplicaciones que operan en tiempo real.

Puede encontrar el código fuente completo en mi repositorio en GitHub .

Estoy listo para discutir este artículo con placer, espero sus comentarios y también espero críticas constructivas de todos los lectores interesados.

¡Te deseo éxito!

PD: originalmente se planeó usar una base de datos local de PostgreSQL, pero dado mi amor por AWS, decidí poner la base de datos en la nube. En el próximo artículo sobre este tema, mostraré cómo implementar todo el sistema descrito anteriormente en AWS utilizando AWS Kinesis y AWS EMR. Sigue las noticias!

Source: https://habr.com/ru/post/451160/


All Articles