Olá Habr! Hoje, construiremos um sistema que usará o Apark Kafka para processar fluxos de mensagens usando o Spark Streaming e gravar o resultado do processamento no banco de dados em nuvem da AWS RDS.
Imagine que uma determinada instituição de crédito nos tenha encarregado de processar as transações recebidas em tempo real em todas as suas agências. Isso pode ser feito para calcular rapidamente a posição em moeda aberta do Tesouro, limites ou resultados financeiros das transações, etc.
Como implementar este caso sem o uso de magias e feitiços mágicos - lemos sob o corte! Vamos lá!
(Fonte da imagem)1. Introdução
Obviamente, o processamento de uma grande variedade de dados em tempo real oferece amplas oportunidades para uso em sistemas modernos. Uma das combinações mais populares para isso é o tandem Apache Kafka e Spark Streaming, onde o Kafka cria um fluxo de pacotes de mensagens recebidas e o Spark Streaming processa esses pacotes em um intervalo de tempo especificado.
Para aumentar a tolerância a falhas do aplicativo, usaremos pontos de verificação - pontos de verificação. Usando esse mecanismo, quando o módulo Spark Streaming precisa recuperar dados perdidos, ele precisa retornar ao último ponto de controle e retomar os cálculos a partir dele.
Arquitetura do sistema em desenvolvimento
Componentes Utilizados:
- O Apache Kafka é um sistema de mensagens distribuído com publicação e assinatura. Adequado para consumo de mensagens offline e online. Para evitar a perda de dados, as mensagens Kafka são armazenadas no disco e replicadas no cluster. O sistema Kafka é construído sobre o serviço de sincronização do ZooKeeper;
- Apache Spark Streaming - componente Spark para processar dados de streaming. O módulo Spark Streaming é construído usando a arquitetura de micr lote, quando o fluxo de dados é interpretado como uma sequência contínua de pequenos pacotes de dados. O Spark Streaming recebe dados de várias fontes e os combina em pacotes pequenos. Novos pacotes são criados em intervalos regulares. No início de cada intervalo de tempo, um novo pacote é criado e todos os dados recebidos durante esse intervalo são incluídos no pacote. No final do intervalo, o crescimento do pacote para. O tamanho do intervalo é determinado por um parâmetro chamado intervalo de lote;
- Apache Spark SQL - Combina processamento relacional com programação funcional Spark. Dados estruturados referem-se a dados que possuem um esquema, ou seja, um único conjunto de campos para todos os registros. O Spark SQL suporta entrada de uma variedade de fontes de dados estruturados e, graças à disponibilidade de informações sobre o esquema, ele pode recuperar eficientemente apenas os campos de registro necessários e também fornece APIs do DataFrame;
- O AWS RDS é um banco de dados relacional baseado em nuvem, relativamente barato, um serviço da Web que simplifica a configuração, operação e dimensionamento e é administrado diretamente pela Amazon.
Instale e inicie o servidor Kafka
Antes de usar o Kafka diretamente, é necessário garantir que o Java esteja disponível, como A JVM é usada para o trabalho:
sudo apt-get update sudo apt-get install default-jre java -version
Crie um novo usuário para trabalhar com Kafka:
sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo
Em seguida, faça o download da distribuição no site oficial do Apache Kafka:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
Descompacte o arquivo baixado:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
O próximo passo é opcional. O fato é que as configurações padrão não permitem o uso completo de todos os recursos do Apache Kafka. Por exemplo, exclua um tópico, categoria, grupo no qual as mensagens podem ser publicadas. Para mudar isso, edite o arquivo de configuração:
vim ~/kafka/config/server.properties
Adicione o seguinte ao final do arquivo:
delete.topic.enable = true
Antes de iniciar o servidor Kafka, você precisa iniciar o servidor ZooKeeper, usaremos o script auxiliar que acompanha a distribuição Kafka:
Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties
Após o ZooKeeper ser iniciado com êxito, em um terminal separado, lançamos o servidor Kafka:
bin/kafka-server-start.sh config/server.properties
Crie um novo tópico chamado Transação:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
Verifique se o tópico com o número certo de partições e replicação foi criado:
bin/kafka-topics.sh --describe --zookeeper localhost:2181

Perderemos os momentos de testar o produtor e o consumidor para o tópico recém-criado. Para obter mais detalhes sobre como testar o envio e o recebimento de mensagens, consulte a documentação oficial -
Enviar algumas mensagens . Bem, passamos a escrever um produtor em Python usando a API KafkaProducer.
Redação do Produtor
O produtor irá gerar dados aleatórios - 100 mensagens por segundo. Por dados aleatórios, entendemos um dicionário composto por três campos:
- Sucursal - nome do ponto de venda da instituição de crédito
- Moeda - moeda de transação;
- Valor - valor da transação. O valor será um número positivo se for uma compra de moeda pelo Banco e negativo se for uma venda.
O código para o produtor é o seguinte:
from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict
Em seguida, usando o método send, enviamos uma mensagem para o servidor, no tópico que precisamos, no formato JSON:
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.format(e)) finally: producer.flush()
Ao executar o script, recebemos as seguintes mensagens no terminal:
Isso significa que tudo funciona como queríamos - o produtor gera e envia mensagens para o tópico de que precisamos.
A próxima etapa é instalar o Spark e processar esse fluxo de mensagens.
Instale o Apache Spark
O Apache Spark é uma plataforma de computação em cluster versátil e de alto desempenho.
Em termos de desempenho, o Spark supera as implementações populares do modelo MapReduce, fornecendo simultaneamente suporte para uma ampla variedade de tipos de cálculos, incluindo consultas interativas e processamento de fluxo. A velocidade desempenha um papel importante no processamento de grandes quantidades de dados, pois é a velocidade que permite que você trabalhe interativamente sem gastar minutos ou horas esperando. Um dos maiores pontos fortes do Spark em alta velocidade é a capacidade de realizar cálculos na memória.
Essa estrutura é escrita no Scala, portanto você deve instalá-lo primeiro:
sudo apt-get install scala
Faça o download da distribuição Spark no site oficial:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
Descompacte o arquivo:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Adicione o caminho ao Spark no arquivo bash:
vim ~/.bashrc
Adicione as seguintes linhas através do editor:
SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH
Execute o comando abaixo depois de fazer alterações no bashrc:
source ~/.bashrc
Implantação do AWS PostgreSQL
Resta implantar o banco de dados, onde carregaremos as informações processadas dos fluxos. Para isso, usaremos o serviço AWS RDS.
Vá para o console AWS -> AWS RDS -> Bancos de dados -> Criar banco de dados:
Selecione PostgreSQL e clique no botão Avançar:
Porque Este exemplo é entendido apenas para fins educacionais, usaremos um servidor gratuito "no mínimo" (Nível Grátis):
Em seguida, marque o bloco Free Tier e, depois disso, ofereceremos automaticamente uma instância da classe t2.micro - embora fraca, é gratuita e bastante adequada para a nossa tarefa:
Coisas muito importantes seguem: o nome da instância do banco de dados, o nome do usuário mestre e sua senha. Vamos nomear a instância: myHabrTest, o usuário principal:
habr , a senha:
habr12345 e clique no botão Avançar:
A próxima página contém os parâmetros responsáveis pela disponibilidade externa do servidor de banco de dados (acessibilidade pública) e disponibilidade da porta:
Vamos criar uma nova configuração para o grupo de segurança VPC, que nos permitirá acessar nosso servidor de banco de dados de fora pela porta 5432 (PostgreSQL).
Em uma janela separada do navegador, acesse o console da AWS na seção Painel da VPC -> Grupos de segurança -> Criar grupo de segurança:
Defina o nome do grupo Segurança - PostgreSQL, uma descrição, indique a qual VPC esse grupo deve ser associado e clique no botão Criar:
Preencha o grupo de regras de entrada recém-criado para a porta 5432, conforme mostrado na figura abaixo. Você não precisa especificar uma porta manual, mas selecione PostgreSQL na lista suspensa Tipo.
Estritamente falando, o valor :: / 0 significa a disponibilidade de tráfego de entrada para um servidor de todo o mundo, o que é canonicamente não totalmente verdadeiro, mas para analisar o exemplo, vamos usar esta abordagem:
Voltamos à página do navegador, onde temos “Definir configurações avançadas” aberto e selecionamos na seção Grupos de segurança VPC -> Escolha grupos de segurança VPC existentes -> PostgreSQL:
Em seguida, na seção Opções do banco de dados -> Nome do banco de dados -> defina o nome -
habrDB .
Podemos deixar o restante dos parâmetros, com a exceção de desativar o backup (período de retenção de backup - 0 dias), o monitoramento e o Performance Insights, por padrão. Clique no botão
Criar banco de dados :
Manipulador de fluxo
A etapa final será o desenvolvimento dos trabalhos do Spark, que processarão a cada dois segundos novos dados provenientes do Kafka e inserirão o resultado no banco de dados.
Como observado acima, os pontos de verificação são o mecanismo principal no SparkStreaming que deve ser configurado para fornecer tolerância a falhas. Usaremos pontos de controle e, no caso de uma queda de procedimento, o módulo Spark Streaming precisará retornar ao último ponto de controle e retomar os cálculos para restaurar os dados perdidos.
Você pode ativar o ponto de verificação configurando o diretório em um sistema de arquivos confiável e tolerante a falhas (por exemplo, HDFS, S3 etc.), no qual as informações do ponto de verificação serão salvas. Isso é feito usando, por exemplo:
streamingContext.checkpoint(checkpointDirectory)
No nosso exemplo, usaremos a seguinte abordagem, a saber, se o checkpointDirectory existir, o contexto será recriado a partir dos dados do ponto de controle. Se o diretório não existir (ou seja, for executado pela primeira vez), a função functionToCreateContext será chamada para criar um novo contexto e configurar o DStreams:
from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Crie um objeto DirectStream para conectar-se ao tópico "transação" usando o método createDirectStream da biblioteca KafkaUtils:
from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list})
Analisando dados recebidos no formato JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream")
Usando o Spark SQL, fazemos um agrupamento simples e imprimimos o resultado no console:
select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency
Obtendo o texto da consulta e executando-o através do Spark SQL:
sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5)
E, em seguida, salvamos os dados agregados recebidos em uma tabela no AWS RDS. Para salvar os resultados da agregação em uma tabela de banco de dados, usaremos o método write do objeto DataFrame:
testResultDataFrame.write \ .format("jdbc") \ .mode("append") \ .option("driver", 'org.postgresql.Driver') \ .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") \ .option("dbtable", "transaction_flow") \ .option("user", "habr") \ .option("password", "habr12345") \ .save()
Algumas palavras sobre como configurar uma conexão com o AWS RDS. Criamos o usuário e a senha para ele na etapa "Implantando o AWS PostgreSQL". Para o URL do servidor de banco de dados, use o Endpoint, que é exibido na seção Conectividade e segurança:
Para conectar corretamente o Spark e o Kafka, você deve executar o trabalho através do smark-submit usando o
artefato spark-streaming-kafka-0-8_2.11 . Além disso, também aplicamos o artefato para interagir com o banco de dados PostgreSQL, e os transferiremos através de --packages.
Para a flexibilidade do script, também retiramos o nome do servidor de mensagens e o tópico do qual queremos receber dados como parâmetros de entrada.
Então, é hora de iniciar e testar o sistema:
spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction
Tudo deu certo! Como você pode ver na figura abaixo, durante o trabalho do aplicativo, novos resultados de agregação são exibidos a cada 2 segundos, porque definimos o intervalo de lotes para 2 segundos ao criar o objeto StreamingContext:
Em seguida, fazemos uma consulta simples ao banco de dados para verificar os registros na tabela
transaction_flow :
Conclusão
Este artigo examinou um exemplo de processamento de informações de streaming usando o Spark Streaming em conjunto com o Apache Kafka e o PostgreSQL. Com o crescimento de dados de várias fontes, é difícil superestimar o valor prático do Spark Streaming para a criação de aplicativos de streaming e aplicativos que operam em tempo real.
Você pode encontrar o código fonte completo no meu repositório no
GitHub .
Estou pronto para discutir este artigo com prazer, aguardo seus comentários e também espero críticas construtivas de todos os leitores interessados.
Desejo-lhe sucesso!
PS: Foi originalmente planejado o uso de um banco de dados PostgreSQL local, mas, devido ao meu amor pela AWS, decidi colocar o banco de dados na nuvem. No próximo artigo sobre este tópico, mostrarei como implementar todo o sistema descrito acima na AWS usando o AWS Kinesis e o AWS EMR. Acompanhe as novidades!