对来自各种来源的数据进行多处理和核对

哈Ha!

考虑到分布式系统的多样性,目标存储中已验证信息的可用性是数据一致性的重要标准。

有许多方法和方法可以达到这种效果,我们将重点放在和解上,本文在本文中讨论了其理论方面 我建议考虑该系统的实际实现,该系统可扩展并适用于大量数据。

如何在良好的旧Python上实现这种情况-请切入阅读! 走吧


(图片来源)

引言


假设一个金融机构有几个分布式系统,我们面临着验证这些系统中的交易并将对帐后的数据上传到目标存储的任务。

作为数据源,在PostgreSQL数据库中获取一个大文本文件和一个表。 假设这些源中的数据具有相同的事务,但是它们可能具有差异,因此需要对其进行验证并将其写入最终存储中的已验证数据以进行分析。

此外,有必要在同一数据库上并行启动多个对帐,并使用多处理功能使系统适应大量需求。

多处理模块非常适合在Python中并行化操作,从某种意义上说,它规避了某些GIL缺陷。 我们将在下面使用此库的功能。

正在开发的系统架构



使用的组件:

  • 随机数据生成器 -生成CSV文件并在其基础上填充数据库表的Python脚本;
  • 数据源 -PostgreSQL数据库中的CSV文件和表;
  • 适配器 -在这种情况下,我们使用两个适配器,将从它们的源(CSV或数据库)中提取数据并将信息输入中间数据库;
  • 数据库 -共有三部分:原始数据,存储适配器获取的信息的中间数据库以及包含来自这两个源的已协调事务的“干净”数据库。

初步训练


作为数据存储工具,我们将在Docker容器中使用PostgreSQL数据库,并通过在容器中运行的pgAdmin与我们的数据库进行交互:

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres 

运行pgAdmin:

 docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4 

一切开始之后,请不要忘记在配置文件(conf / db.ini)中指定数据库的连接字符串(对于培训示例,可以!):

 [POSTGRESQL] db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user 

原则上,容器的使用是可选的,您可以使用数据库服务器。

输入产生


Python脚本generate_test_data负责生成测试数据,该数据需要生成所需数量的条目。 可以通过GenerateTestData类的主要功能轻松跟踪操作顺序:

  @m.timing def run(self, num_rows): """ Run the process """ m.info('START!') self.create_db_schema() self.create_folder('data') self.create_csv_file(num_rows) self.bulk_copy_to_db() self.random_delete_rows() self.random_update_rows() m.info('END!') 

因此,该函数执行以下步骤:

  • 在数据库中创建架构(我们创建所有主要架构和表);
  • 创建一个用于存储测试文件的文件夹;
  • 生成具有给定行数的测试文件;
  • 将数据批量插入目标表transaction_db_raw.transaction_log;
  • 意外删除该表中的多行;
  • 此表中几行的随机更新。

删除和修改是必需的,以便比较的对象至少有一些差异。 能够找到这些差异很重要!

 @m.timing @m.wrapper(m.entering, m.exiting) def random_delete_rows(self): """ Random deleting some rows from the table """ sql_command = sql.SQL(""" delete from {0}.{1} where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) 

生成测试数据集并随后将其记录为CSV格式的文本文件如下:

  • 创建一个随机事务UID;
  • 创建一个随机的UID帐号(默认情况下,我们使用十个唯一的帐号,但是可以使用配置文件通过更改“ random_accounts”参数来更改此值);
  • 交易日期-从配置文件中指定的日期(initial_date)开始的随机日期;
  • 交易类型(交易/佣金);
  • 交易金额;
  • 数据生成的主要工作由TestDataCreator类的generate_test_data_by_chunk方法执行:

 @m.timing def generate_test_data_by_chunk(self, chunk_start, chunk_end): """ Generating and saving to the file """ num_rows_mp = chunk_end - chunk_start new_rows = [] for _ in range(num_rows_mp): transaction_uid = uuid.uuid4() account_uid = choice(self.list_acc) transaction_date = (self.get_random_date(self.date_in, 0) .__next__() .strftime('%Y-%m-%d %H:%M:%S')) type_deal = choice(self.list_type_deal) transaction_amount = randint(-1000, 1000) new_rows.append([transaction_uid, account_uid, transaction_date, type_deal, transaction_amount]) self.write_in_file(new_rows, chunk_start, chunk_end) 

此功能的一个特点是在多个并行异步进程中启动,每个进程都会生成自己的50K记录部分。 这个“芯片”将使您足够快地在几百万行上创建文件

 def run_csv_writing(self): """ Writing the test data into csv file """ pool = mp.Pool(mp.cpu_count()) jobs = [] for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows): jobs.append(pool.apply_async(self.generate_test_data_by_chunk, (chunk_start, chunk_end))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() 

文本文件完成后,处理bulk_insert命令,并且该文件中的所有数据均落入transaction_db_raw.transaction_log表中

此外,这两个源将包含完全相同的数据,对帐不会发现任何有趣的内容,因此我们删除并更改了数据库中的几个随机行。

运行脚本并生成包含10,000行交易的测试CSV文件:

 ./generate_test_data.py 10000 


屏幕截图显示已接收到10K行的文件,将10K行加载到数据库中,然后从数据库中删除了112行,并更改了108行,结果:数据库中的文件和表相差220个条目。

您问:“嗯,多处理在哪里?”
当您生成更大的文件时,可以看到它的工作,而不是10K条记录,而是1M条记录。 我们会尝试吗?

 ./generate_test_data.py 1000000 


加载数据,删除和更改随机记录后,我们从表中看到了文本文件的区别:19,939行(其中10022条是随机删除的,而9917条是更改的)。

图片显示记录的生成是异步的,不一致的。 这意味着下一个过程可以在不考虑开始顺序的情况下立即开始,而前一个过程就完成了。 不能保证结果与输入的顺序相同。

肯定更快吗?
在15.5秒内“发明”了不在最快的虚拟机上的100万行-这是一个值得选择的选择。 在不使用多处理的情况下按顺序启动了同一代,我得到的结果是:文件生成的速度慢了三倍多(超过了52秒而不是15.5秒):



CSV适配器


该适配器对行进行哈希处理,仅保留第一列(事务标识符)不变,并将接收到的数据保存到data / transaction_hashed.csv文件中。 他的工作的最后一步是使用COPY命令将此文件加载到reconciliation_db模式的临时表中

最佳的文件读取是通过几个并行过程执行的。 我们逐行读取,每个读取5兆字节。 通过经验方法获得数字“ 5兆字节”。 正是由于只有一小段文字,我们才能够在最短的时间内读取虚拟机上的大文件。 您可以使用此参数在您的环境中进行实验,并查看运行时间将如何变化:

 @m.timing def process_wrapper(self, chunk_start, chunk_size): """ Read a particular chunk """ with open(self.file_name_raw, newline='\n') as file: file.seek(chunk_start) lines = file.read(chunk_size).splitlines() for line in lines: self.process(line) def chunkify(self, size=1024*1024*5): """ Return a new chunk """ with open(self.file_name_raw, 'rb') as file: chunk_end = file.tell() while True: chunk_start = chunk_end file.seek(size, 1) file.readline() chunk_end = file.tell() if chunk_end > self.file_end: chunk_end = self.file_end yield chunk_start, chunk_end - chunk_start break else: yield chunk_start, chunk_end - chunk_start @m.timing def run_reading(self): """ The main method for the reading """ # init objects pool = mp.Pool(mp.cpu_count()) jobs = [] m.info('Run csv reading...') # create jobs for chunk_start, chunk_size in self.chunkify(): jobs.append(pool.apply_async(self.process_wrapper, (chunk_start, chunk_size))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() m.info('CSV file reading has been completed') 

读取1M记录上先前创建的文件的示例:


屏幕快照显示了如何为当前对帐运行创建一个具有唯一名称的临时表。 接下来是部分异步读取文件并获取每一行的哈希值。 将数据从适配器插入目标表即可完成此适配器的工作。
为每个对帐过程使用具有唯一名称的临时表,可以使您在一个数据库中额外并行化对帐过程。

PostgreSQL适配器


用于处理表中存储的数据的适配器与文件适配器的逻辑大致相同:

  • 读取表的某些部分(如果很大,则超过10万个条目),并对除事务标识符之外的所有列进行哈希处理;
  • 然后将已处理的数据插入到reconciliation_db表中 存储_ $(int(time.time())

该适配器的一个有趣特性是它使用一个到数据库的连接池,该池将通过索引在表中搜索必要的数据并进行处理。

根据表的大小,计算处理所需的进程数,并且在每个进程中将其划分为10个任务。

 def read_data(self): """ Read the data from the postgres and shared those records with each processor to perform their operation using threads """ threads_array = self.get_threads(0, self.max_id_num_row, self.pid_max) for pid in range(1, len(threads_array) + 1): m.info('Process %s' % pid) # Getting connection from the connection pool select_conn = self._select_conn_pool.getconn() select_conn.autocommit = 1 # Creating 10 process to perform the operation process = Process(target=self.process_data, args=(self.data_queque, pid, threads_array[pid-1][0], threads_array[pid-1][1], select_conn)) process.daemon = True process.start() process.join() select_conn.close() 


搜索差异


我们继续验证从两个适配器接收到的数据。

使用SQL语言的所有功能,对帐(或接收差异报告)发生在数据库的服务器端。

SQL查询非常简单-它只是一个表联接,通过事务ID将数据从适配器传递到自身:

 sql_command = sql.SQL(""" select s1.adapter_name, count(s1.transaction_uid) as tran_count from {0}.{1} s1 full join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table)) 

输出为报告:


检查上面图片中的所有内容是否正确。 我们记得从数据库表中删除了9917,并更改了10022行。 报告中共显示19939行。

汇总表


仅保留将在各个方面(通过哈希)匹配的不同适配器中的“干净”事务插入存储表中。 此过程由以下SQL查询执行:

 sql_command = sql.SQL(""" with reconcil_data as ( select s1.transaction_uid from {0}.{1} s1 join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table), sql.Identifier(self.schema_db_clean), sql.Identifier(self.schema_raw)) 

我们用作适配器的中间数据存储的临时表可以删除。


结论


在完成工作的过程中,开发了一种用于协调来自各种来源的数据的系统:文本文件和数据库中的表格。 最少使用其他工具。

也许老练的读者可能会注意到,使用诸如Apache Spark之类的框架,再加上将源数据转换为镶木地板格式,可以大大加快此过程,特别是对于大批量交易而言。 但是这项工作的主要目标是用裸Python编写系统并研究多处理数据处理。 我认为我们已经处理了什么。

整个项目的源代码位于我在GitHub上的存储库中 ,建议您熟悉一下它。

我很乐意回答所有问题并结识您的意见。

祝你成功!

Source: https://habr.com/ru/post/zh-CN480076/


All Articles