Multiprocesamiento y conciliación de datos de varias fuentes.

Hola Habr!

Dada la variedad de sistemas distribuidos, la disponibilidad de información verificada en el almacenamiento de destino es un criterio importante para la consistencia de los datos.

Hay muchos enfoques y métodos a este efecto, y nos centraremos en la reconciliación, cuyos aspectos teóricos se discutieron aquí en este artículo. Propongo considerar la implementación práctica de este sistema, escalable y adaptada a una gran cantidad de datos.

Cómo implementar este caso en el viejo Python, ¡léelo debajo del corte! Vamos!


(Fuente de la imagen)

Introduccion


Imaginemos que una institución financiera tiene varios sistemas distribuidos y nos enfrentamos a la tarea de verificar las transacciones en estos sistemas y cargar los datos conciliados en el almacenamiento de destino.

Como fuente de datos, tome un archivo de texto grande y una tabla en una base de datos PostgreSQL. Suponga que los datos en estas fuentes tienen las mismas transacciones, pero pueden tener diferencias y, por lo tanto, deben verificarse y escribirse en los datos verificados en el almacenamiento final para su análisis.

Además, es necesario prever el lanzamiento paralelo de varias conciliaciones en la misma base de datos y adaptar el sistema a un gran volumen mediante multiprocesamiento.

El módulo de multiprocesamiento es ideal para paralelizar operaciones en Python y, en cierto sentido, evita ciertos defectos de GIL. Utilizaremos las capacidades de esta biblioteca a continuación.

Arquitectura del sistema en desarrollo.



Componentes utilizados:

  • Generador de datos aleatorio : un script de Python que genera un archivo CSV y, sobre la base, llena una tabla en una base de datos;
  • Fuentes de datos : archivo CSV y tabla en la base de datos PostgreSQL;
  • Adaptadores : en este caso, utilizamos dos adaptadores que extraerán datos de sus fuentes (CSV o base de datos) e ingresarán información en la base de datos intermedia;
  • Bases de datos : en la cantidad de tres piezas: datos sin procesar, una base de datos intermedia que almacena información capturada por los adaptadores y una base de datos "limpia" que contiene transacciones conciliadas de ambas fuentes.

Entrenamiento inicial


Como herramienta de almacenamiento de datos, utilizaremos la base de datos PostgreSQL en el contenedor Docker e interactuaremos con nuestra base de datos a través de pgAdmin que se ejecuta en el contenedor :

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres 

Ejecutando pgAdmin:

 docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4 

Después de que todo haya comenzado, no olvide especificar en el archivo de configuración (conf / db.ini) la cadena de conexión a la base de datos (para un ejemplo de capacitación, ¡puede!):

 [POSTGRESQL] db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user 

En principio, el uso de un contenedor es opcional y puede usar su servidor de base de datos.

Generación de entrada


El script Python generate_test_data es responsable de la generación de datos de prueba, que toma la cantidad deseada de entradas para generar. La secuencia de operaciones se puede rastrear fácilmente por la función principal de la clase GenerateTestData :

  @m.timing def run(self, num_rows): """ Run the process """ m.info('START!') self.create_db_schema() self.create_folder('data') self.create_csv_file(num_rows) self.bulk_copy_to_db() self.random_delete_rows() self.random_update_rows() m.info('END!') 

Entonces, la función realiza los siguientes pasos:

  • Crear esquemas en la base de datos (creamos todos los esquemas y tablas principales);
  • Crear una carpeta para almacenar un archivo de prueba;
  • Generando un archivo de prueba con un número dado de líneas;
  • Insertar datos de forma masiva en la tabla de destino transaction_db_raw.transaction_log;
  • Eliminación accidental de múltiples filas en esta tabla;
  • Actualización aleatoria de varias filas en esta tabla.

La eliminación y modificación es necesaria para que los objetos comparados tengan al menos alguna discrepancia. ¡Es importante poder buscar estas discrepancias!

 @m.timing @m.wrapper(m.entering, m.exiting) def random_delete_rows(self): """ Random deleting some rows from the table """ sql_command = sql.SQL(""" delete from {0}.{1} where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) 

La generación de un conjunto de datos de prueba y la posterior grabación en un archivo de texto en formato CSV es la siguiente:

  • Se crea un UID de transacción aleatoria;
  • Se crea un número de cuenta UID aleatorio (de forma predeterminada, tomamos diez cuentas únicas, pero este valor se puede cambiar usando el archivo de configuración cambiando el parámetro "cuentas_al azar");
  • Fecha de transacción: una fecha aleatoria a partir de la fecha especificada en el archivo de configuración (initial_date);
  • Tipo de transacción (transacción / comisión);
  • Monto de la transacción;
  • El trabajo principal en la generación de datos se realiza mediante el método generate_test_data_by_chunk de la clase TestDataCreator :

 @m.timing def generate_test_data_by_chunk(self, chunk_start, chunk_end): """ Generating and saving to the file """ num_rows_mp = chunk_end - chunk_start new_rows = [] for _ in range(num_rows_mp): transaction_uid = uuid.uuid4() account_uid = choice(self.list_acc) transaction_date = (self.get_random_date(self.date_in, 0) .__next__() .strftime('%Y-%m-%d %H:%M:%S')) type_deal = choice(self.list_type_deal) transaction_amount = randint(-1000, 1000) new_rows.append([transaction_uid, account_uid, transaction_date, type_deal, transaction_amount]) self.write_in_file(new_rows, chunk_start, chunk_end) 

Una característica de esta función es el lanzamiento en varios procesos asincrónicos paralelos, cada uno de los cuales genera su propia porción de 50K registros. Este "chip" le permitirá crear un archivo en varios millones de líneas lo suficientemente rápido

 def run_csv_writing(self): """ Writing the test data into csv file """ pool = mp.Pool(mp.cpu_count()) jobs = [] for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows): jobs.append(pool.apply_async(self.generate_test_data_by_chunk, (chunk_start, chunk_end))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() 

Después de completar el archivo de texto, se procesa el comando bulk_insert y todos los datos de este archivo caen en la tabla transaction_db_raw.transaction_log.

Además, las dos fuentes contendrán exactamente los mismos datos y la reconciliación no encontrará nada interesante, por lo que eliminamos y cambiamos varias filas aleatorias en la base de datos.

Ejecute el script y genere un archivo CSV de prueba con transacciones en líneas de 10K:

 ./generate_test_data.py 10000 


La captura de pantalla muestra que el archivo se recibió en 10K líneas, 10K se cargó en la base de datos, pero luego se eliminaron 112 líneas de la base de datos y se modificaron otras 108. Resultado: el archivo y la tabla en la base de datos difieren en 220 entradas.

“Bueno, ¿dónde está el multiprocesamiento?”, Preguntas.
Y su trabajo se puede ver cuando genera un archivo más grande, no por 10K registros, sino, por ejemplo, por 1M. ¿Lo intentaremos?

 ./generate_test_data.py 1000000 


Después de cargar los datos, eliminar y cambiar registros aleatorios, vemos las diferencias del archivo de texto de la tabla: 19,939 filas (de las cuales 10,022 se eliminaron aleatoriamente y 9,917 cambiaron).

La imagen muestra que la generación de registros fue asincrónica, inconsistente. Esto significa que el siguiente proceso puede comenzar sin tener en cuenta el orden de inicio tan pronto como se complete el anterior. No hay garantía de que el resultado esté en el mismo orden que la entrada.

¿Es definitivamente más rápido?
Se "inventó" un millón de líneas que no estaban en la máquina virtual más rápida en 15.5 segundos, y esta es una opción valiosa. Después de comenzar la misma generación secuencialmente, sin usar multiprocesamiento, obtuve el resultado: la generación de archivos fue más de tres veces más lenta (más de 52 segundos en lugar de 15.5):



Adaptador para CSV


Este adaptador divide la fila, dejando solo la primera columna, la ID de la transacción, sin cambios y guarda los datos recibidos en el archivo data / transaction_hashed.csv . El paso final de su trabajo es cargar este archivo usando el comando COPY en la tabla temporal del esquema reconciliation_db.

La lectura óptima de archivos se realiza mediante varios procesos paralelos. Leemos línea por línea, en piezas de 5 megabytes cada una. La cifra "5 megabytes" se obtuvo por el método empírico. Fue con este tamaño de una sola pieza de texto que pudimos obtener el menor tiempo para leer archivos grandes en nuestra máquina virtual. Puede experimentar en su entorno con este parámetro y ver cómo cambiará el tiempo de funcionamiento:

 @m.timing def process_wrapper(self, chunk_start, chunk_size): """ Read a particular chunk """ with open(self.file_name_raw, newline='\n') as file: file.seek(chunk_start) lines = file.read(chunk_size).splitlines() for line in lines: self.process(line) def chunkify(self, size=1024*1024*5): """ Return a new chunk """ with open(self.file_name_raw, 'rb') as file: chunk_end = file.tell() while True: chunk_start = chunk_end file.seek(size, 1) file.readline() chunk_end = file.tell() if chunk_end > self.file_end: chunk_end = self.file_end yield chunk_start, chunk_end - chunk_start break else: yield chunk_start, chunk_end - chunk_start @m.timing def run_reading(self): """ The main method for the reading """ # init objects pool = mp.Pool(mp.cpu_count()) jobs = [] m.info('Run csv reading...') # create jobs for chunk_start, chunk_size in self.chunkify(): jobs.append(pool.apply_async(self.process_wrapper, (chunk_start, chunk_size))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() m.info('CSV file reading has been completed') 

Ejemplo de lectura de un archivo creado previamente en registros 1M:


La captura de pantalla muestra la creación de una tabla temporal con un nombre único para la ejecución de reconciliación actual. Lo siguiente es la lectura asíncrona del archivo en partes y tomar el hash de cada línea. Insertar datos del adaptador en la tabla de destino completa el trabajo con este adaptador.
El uso de una tabla temporal con un nombre único para cada proceso de reconciliación le permite paralelizar adicionalmente el proceso de reconciliación en una base de datos.

Adaptador para PostgreSQL


El adaptador para procesar los datos almacenados en la tabla funciona aproximadamente con la misma lógica que el adaptador para el archivo:

  • leer en partes de la tabla (si es grande, más de 100K entradas) y tomar un hash para todas las columnas excepto el identificador de transacción;
  • entonces los datos procesados ​​se insertan en la tabla reconciliation_db. almacenamiento _ $ (int (time.time ()) .

Una característica interesante de este adaptador es que utiliza un conjunto de conexiones a la base de datos, que buscará por índice los datos necesarios en la tabla y los procesará.

Según el tamaño de la tabla, se calcula el número de procesos necesarios para el procesamiento y dentro de cada proceso hay una división en 10 tareas.

 def read_data(self): """ Read the data from the postgres and shared those records with each processor to perform their operation using threads """ threads_array = self.get_threads(0, self.max_id_num_row, self.pid_max) for pid in range(1, len(threads_array) + 1): m.info('Process %s' % pid) # Getting connection from the connection pool select_conn = self._select_conn_pool.getconn() select_conn.autocommit = 1 # Creating 10 process to perform the operation process = Process(target=self.process_data, args=(self.data_queque, pid, threads_array[pid-1][0], threads_array[pid-1][1], select_conn)) process.daemon = True process.start() process.join() select_conn.close() 


Buscar discrepancias


Procedemos a la verificación de los datos recibidos de dos adaptadores.

La reconciliación (o la recepción de un informe de discrepancia) se produce en el lado del servidor de la base de datos, utilizando todo el poder del lenguaje SQL.

La consulta SQL es bastante sencilla: es solo una unión de tabla con datos de los adaptadores a sí misma por ID de transacción:

 sql_command = sql.SQL(""" select s1.adapter_name, count(s1.transaction_uid) as tran_count from {0}.{1} s1 full join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table)) 

El resultado es un informe:


Compruebe si todo está correcto en la imagen de arriba. Recordamos que 9917 se eliminaron de la tabla en la base de datos y se modificaron 10,022 filas. Total de 19939 líneas, lo cual es evidente en el informe.

Tabla resumen


Solo queda insertar transacciones "limpias" en la tabla de almacenamiento que coincidan en todos los aspectos (por hash) en diferentes adaptadores. Este proceso se realiza mediante la siguiente consulta SQL:

 sql_command = sql.SQL(""" with reconcil_data as ( select s1.transaction_uid from {0}.{1} s1 join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table), sql.Identifier(self.schema_db_clean), sql.Identifier(self.schema_raw)) 

La tabla temporal que utilizamos como almacenamiento intermedio de datos de los adaptadores se puede eliminar.


Conclusión


En el curso del trabajo realizado, se desarrolló un sistema para conciliar datos de varias fuentes: un archivo de texto y una tabla en la base de datos. Usó un mínimo de herramientas adicionales.

Quizás un lector sofisticado pueda notar que el uso de marcos como Apache Spark, junto con la conversión de los datos de origen a un formato de parquet, puede acelerar significativamente este proceso, especialmente para grandes volúmenes. Pero el objetivo principal de este trabajo es escribir un sistema en Python desnudo y estudiar el procesamiento de datos de multiprocesamiento. Con lo que, en mi opinión, hemos tratado.

El código fuente de todo el proyecto se encuentra en mi repositorio en GitHub , le sugiero que se familiarice con él.

Estaré encantado de responder todas las preguntas y conocer sus comentarios.

¡Te deseo éxito!

Source: https://habr.com/ru/post/480076/


All Articles