Multiprocessamento e reconciliação de dados de várias fontes

Olá Habr!

Dada a variedade de sistemas distribuídos, a disponibilidade de informações verificadas no armazenamento de destino é um critério importante para a consistência dos dados.

Existem muitas abordagens e métodos para esse efeito, e vamos nos concentrar na reconciliação, cujos aspectos teóricos foram discutidos aqui neste artigo. Proponho considerar a implementação prática desse sistema, escalonável e adaptada a uma grande quantidade de dados.

Como implementar este caso no bom e velho Python - leia-o por baixo! Vamos lá!


(Fonte da imagem)

1. Introdução


Vamos imaginar que uma instituição financeira tenha vários sistemas distribuídos e somos confrontados com a tarefa de verificar as transações nesses sistemas e carregar os dados reconciliados no armazenamento de destino.

Como fonte de dados, pegue um arquivo de texto grande e uma tabela em um banco de dados PostgreSQL. Suponha que os dados nessas fontes tenham as mesmas transações, mas possam ter diferenças e, portanto, precisam ser verificados e gravados nos dados verificados no armazenamento final para análise.

Além disso, é necessário prever o lançamento paralelo de várias reconciliações no mesmo banco de dados e adaptar o sistema a um grande volume usando o multiprocessamento.

O módulo de multiprocessamento é ótimo para operações paralelas em Python e, de certa forma, contorna certas falhas do GIL. Usaremos os recursos desta biblioteca abaixo.

Arquitetura do sistema em desenvolvimento



Componentes Utilizados:

  • Gerador de dados aleatórios - um script Python que gera um arquivo CSV e, em sua base, preenche uma tabela em um banco de dados;
  • Fontes de dados - arquivo e tabela CSV no banco de dados PostgreSQL;
  • Adaptadores - nesse caso, usamos dois adaptadores que extraem dados de suas fontes (CSV ou banco de dados) e inserem informações no banco de dados intermediário;
  • Bancos de dados - na quantidade de três partes: dados brutos, um banco de dados intermediário que armazena informações obtidas pelos adaptadores e um banco de dados "limpo" contendo transações reconciliadas de ambas as fontes.

Treinamento inicial


Como ferramenta de armazenamento de dados, usaremos o banco de dados PostgreSQL no contêiner Docker e interagiremos com nosso banco de dados através do pgAdmin em execução no contêiner :

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres 

Executando o pgAdmin:

 docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4 

Depois que tudo começar, não esqueceremos de especificar no arquivo de configuração (conf / db.ini) a string de conexão com o banco de dados (para um exemplo de treinamento, você pode!):

 [POSTGRESQL] db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user 

Em princípio, o uso de um contêiner é opcional e você pode usar o servidor de banco de dados.

Geração de Entrada


O script Python generate_test_data é responsável pela geração de dados de teste, que leva o número desejado de entradas a serem geradas. A sequência de operações pode ser facilmente rastreada pela função principal da classe GenerateTestData :

  @m.timing def run(self, num_rows): """ Run the process """ m.info('START!') self.create_db_schema() self.create_folder('data') self.create_csv_file(num_rows) self.bulk_copy_to_db() self.random_delete_rows() self.random_update_rows() m.info('END!') 

Portanto, a função executa as seguintes etapas:

  • Criando esquemas no banco de dados (criamos todos os principais esquemas e tabelas);
  • Criando uma pasta para armazenar um arquivo de teste;
  • Gerando um arquivo de teste com um determinado número de linhas;
  • Inserir dados em massa na tabela de destino transaction_db_raw.transaction_log;
  • Exclusão acidental de várias linhas nesta tabela;
  • Atualização aleatória de várias linhas nesta tabela.

A exclusão e a modificação são necessárias para que os objetos comparados tenham pelo menos alguma discrepância. É importante poder procurar essas discrepâncias!

 @m.timing @m.wrapper(m.entering, m.exiting) def random_delete_rows(self): """ Random deleting some rows from the table """ sql_command = sql.SQL(""" delete from {0}.{1} where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) 

A geração de um conjunto de dados de teste e a gravação subsequente em um arquivo de texto no formato CSV é a seguinte:

  • Um UID de transação aleatória é criado;
  • Um número de conta UID aleatório é criado (por padrão, temos dez contas exclusivas, mas esse valor pode ser alterado usando o arquivo de configuração, alterando o parâmetro "random_accounts");
  • Data da transação - uma data aleatória a partir da data especificada no arquivo de configuração (data_inicial);
  • Tipo de transação (transação / comissão);
  • Valor da transação;
  • O principal trabalho na geração de dados é realizado pelo método generate_test_data_by_chunk da classe TestDataCreator :

 @m.timing def generate_test_data_by_chunk(self, chunk_start, chunk_end): """ Generating and saving to the file """ num_rows_mp = chunk_end - chunk_start new_rows = [] for _ in range(num_rows_mp): transaction_uid = uuid.uuid4() account_uid = choice(self.list_acc) transaction_date = (self.get_random_date(self.date_in, 0) .__next__() .strftime('%Y-%m-%d %H:%M:%S')) type_deal = choice(self.list_type_deal) transaction_amount = randint(-1000, 1000) new_rows.append([transaction_uid, account_uid, transaction_date, type_deal, transaction_amount]) self.write_in_file(new_rows, chunk_start, chunk_end) 

Um recurso dessa função é o lançamento em vários processos assíncronos paralelizados, cada um dos quais gera sua própria parte de 50K registros. Esse "chip" permitirá criar rapidamente um arquivo em vários milhões de linhas

 def run_csv_writing(self): """ Writing the test data into csv file """ pool = mp.Pool(mp.cpu_count()) jobs = [] for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows): jobs.append(pool.apply_async(self.generate_test_data_by_chunk, (chunk_start, chunk_end))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() 

Após a conclusão do arquivo de texto, o comando bulk_insert é processado e todos os dados desse arquivo caem na tabela transaction_db_raw.transaction_log.

Além disso, as duas fontes conterão exatamente os mesmos dados e a reconciliação não encontrará nada de interessante, portanto, excluímos e alteramos várias linhas aleatórias no banco de dados.

Execute o script e gere um arquivo CSV de teste com transações em 10 mil linhas:

 ./generate_test_data.py 10000 


A captura de tela mostra que um arquivo de 10 mil linhas foi recebido, 10 mil foram carregados no banco de dados, mas 112 foram excluídas do banco de dados e outras 108 foram alteradas Resultado: o arquivo e a tabela no banco de dados diferem em 220 entradas.

“Bem, onde está o multiprocessamento?”, Você pergunta.
E seu trabalho pode ser visto quando você gera um arquivo maior, não por 10 mil registros, mas, por exemplo, por 1 milhão. Vamos tentar?

 ./generate_test_data.py 1000000 


Após carregar os dados, excluir e alterar registros aleatórios, vemos as diferenças entre o arquivo de texto e a tabela: 19.939 linhas (das quais 10.022 foram excluídas aleatoriamente e 9.917 alteradas).

A imagem mostra que a geração de registros foi assíncrona, inconsistente. Isso significa que o próximo processo pode começar sem levar em conta a ordem de início assim que o anterior for concluído. Não há garantia de que o resultado será na mesma ordem que a entrada.

Definitivamente é mais rápido?
Um milhão de linhas que não estão na máquina virtual mais rápida foi "inventado" em 15,5 segundos - e essa é uma opção válida. Tendo iniciado a mesma geração sequencialmente, sem usar o multiprocessamento, obtive o resultado: a geração de arquivos foi três vezes mais lenta (mais de 52 segundos em vez de 15,5):



Adaptador para CSV


Esse adaptador faz hash na linha, deixando apenas a primeira coluna, o identificador da transação, inalterada e salva os dados recebidos no arquivo data / transaction_hashed.csv . A etapa final de seu trabalho é carregar esse arquivo usando o comando COPY na tabela temporária do esquema reconciliation_db.

A leitura ideal de arquivos é realizada por vários processos paralelos. Lemos linha por linha, em partes de 5 megabytes cada. A figura "5 megabytes" foi obtida pelo método empírico. Foi com esse tamanho de um pedaço de texto que conseguimos o menor tempo possível para ler arquivos grandes em nossa máquina virtual. Você pode experimentar em seu ambiente esse parâmetro e ver como o tempo de operação mudará:

 @m.timing def process_wrapper(self, chunk_start, chunk_size): """ Read a particular chunk """ with open(self.file_name_raw, newline='\n') as file: file.seek(chunk_start) lines = file.read(chunk_size).splitlines() for line in lines: self.process(line) def chunkify(self, size=1024*1024*5): """ Return a new chunk """ with open(self.file_name_raw, 'rb') as file: chunk_end = file.tell() while True: chunk_start = chunk_end file.seek(size, 1) file.readline() chunk_end = file.tell() if chunk_end > self.file_end: chunk_end = self.file_end yield chunk_start, chunk_end - chunk_start break else: yield chunk_start, chunk_end - chunk_start @m.timing def run_reading(self): """ The main method for the reading """ # init objects pool = mp.Pool(mp.cpu_count()) jobs = [] m.info('Run csv reading...') # create jobs for chunk_start, chunk_size in self.chunkify(): jobs.append(pool.apply_async(self.process_wrapper, (chunk_start, chunk_size))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() m.info('CSV file reading has been completed') 

Exemplo de leitura de um arquivo criado anteriormente em registros de 1 milhão:


A captura de tela mostra a criação de uma tabela temporária com um nome exclusivo para a execução de reconciliação atual. A seguir, é apresentada a leitura assíncrona do arquivo em partes e o hash de cada linha. A inserção de dados do adaptador na tabela de destino conclui o trabalho com este adaptador.
O uso de uma tabela temporária com um nome exclusivo para cada processo de reconciliação permite paralelizar adicionalmente o processo de reconciliação em um banco de dados.

Adaptador para PostgreSQL


O adaptador para processar os dados armazenados na tabela funciona aproximadamente da mesma lógica que o adaptador para o arquivo:

  • lendo partes da tabela (se for grande, com mais de 100 mil entradas) e utilizando um hash para todas as colunas, exceto o identificador da transação;
  • os dados processados ​​são inseridos na tabela reconciliation_db. armazenamento _ $ (int (time.time ()) .

Um recurso interessante desse adaptador é que ele usa um conjunto de conexões com o banco de dados, que procurará por índice os dados necessários na tabela e os processará.

Com base no tamanho da tabela, o número de processos necessários para o processamento é calculado e, em cada processo, há uma divisão em 10 tarefas.

 def read_data(self): """ Read the data from the postgres and shared those records with each processor to perform their operation using threads """ threads_array = self.get_threads(0, self.max_id_num_row, self.pid_max) for pid in range(1, len(threads_array) + 1): m.info('Process %s' % pid) # Getting connection from the connection pool select_conn = self._select_conn_pool.getconn() select_conn.autocommit = 1 # Creating 10 process to perform the operation process = Process(target=self.process_data, args=(self.data_queque, pid, threads_array[pid-1][0], threads_array[pid-1][1], select_conn)) process.daemon = True process.start() process.join() select_conn.close() 


Pesquisar discrepâncias


Prosseguimos com a verificação dos dados recebidos de dois adaptadores.

A reconciliação (ou o recebimento de um relatório de discrepância) ocorre no lado do servidor do banco de dados, usando todo o poder da linguagem SQL.

A consulta SQL é bastante simples - é apenas uma junção de tabela com dados dos adaptadores para si pelo ID da transação:

 sql_command = sql.SQL(""" select s1.adapter_name, count(s1.transaction_uid) as tran_count from {0}.{1} s1 full join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table)) 

A saída é um relatório:


Verifique se está tudo correto na figura acima. Lembramos que 9917 foram excluídos da tabela no banco de dados e 10.022 linhas foram alteradas. Total de 19939 linhas, o que é evidente no relatório.

Tabela Resumo


Resta apenas inserir transações "limpas" na tabela de armazenamento que correspondam em todos os aspectos (por hash) em diferentes adaptadores. Este processo é realizado pela seguinte consulta SQL:

 sql_command = sql.SQL(""" with reconcil_data as ( select s1.transaction_uid from {0}.{1} s1 join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table), sql.Identifier(self.schema_db_clean), sql.Identifier(self.schema_raw)) 

A tabela temporária que usamos como armazenamento intermediário de dados dos adaptadores pode ser excluída.


Conclusão


No decorrer do trabalho realizado, foi desenvolvido um sistema para reconciliar dados de várias fontes: um arquivo de texto e uma tabela no banco de dados. Utilizou um mínimo de ferramentas adicionais.

Talvez um leitor sofisticado possa perceber que o uso de estruturas como o Apache Spark, juntamente com a conversão dos dados de origem em um formato parquet, pode acelerar significativamente esse processo, especialmente para grandes volumes. Mas o principal objetivo deste trabalho é escrever um sistema em Python puro e estudar o processamento de dados de multiprocessamento. Com o que nós, na minha opinião, lidamos.

O código fonte de todo o projeto está no meu repositório no GitHub . Sugiro que você se familiarize com ele.

Será um prazer responder a todas as perguntas e me familiarizar com seus comentários.

Desejo-lhe sucesso!

Source: https://habr.com/ru/post/pt480076/


All Articles