如何在numpy中快速,快速地从PostgreSQL和Python中的ClickHouse

打了很多圈,寻找解决方案,以快速获取大量Python中资产的长期价格历史。 我也有勇气想以numpy数组处理价格,但立即在熊猫中处理更好。

使用标准方法处理前额,结果令人失望,这导致对数据库的查询执行了30秒或更长时间。 不想忍受,我找到了一些完全令我满意的解决方案。

腿从Python的对象本质中成长而来。 毕竟,即使整数也是对象,这对工作速度也有极大的负面影响。 我绝对不想更改语言。

第一个解决方案是对PostgreSQL的价格历史进行分组,这导致了数据库方面的性能显着下降,但是它使任务加速了大约3倍。 该方法在另一篇文章中有更详细的描述。

结果是了解到,在Python中,您需要以某种方式将整个数据集(至少是一个字符串)集成在一起。 并通过numpy-array或立即以熊猫进行解析。

最终结果:

图片

PostgreSQL的额头解决方案


我们在sql查询中进行数据分组。 一个例子:

SELECT string_agg(symbol::text, ',') AS symbol_list , string_agg(dt::text, ',') AS dt_list , string_agg(open::text, ',') AS open_list , string_agg(high::text, ',') AS high_list , string_agg(low::text, ',') AS low_list , string_agg("close"::text, ',') AS close_list , string_agg(volume::text, ',') AS volume_list , string_agg(adj::text, ',') AS adj_list FROM v_prices_fast WHERE symbol IN ('{symbols}') 

解析数据很容易:

 { 'symbol': np.array(r[0].split(',')), # str 'dt': np.array(r[1].split(','), dtype='datetime64'), # str w/type 'open': np.fromstring(r[2], sep=','), # numbers # ... } 

约170万行的生产力:

 %timeit get_prices_fast(is_adj=False) # 11.9s 

现成的Python包


Python对其面临类似问题的社区有益。 以下内容适合我们的目的:

  • odo-用于优化从一个源到另一个源的数据传输速度。 完全使用Python。 它通过SQLAlchemy与PostgreSQL交互。
  • warp_prism -Quantopian项目用于从PostgreSQL检索数据的C扩展名。 基础是odo的功能。

这两个软件包都使用PostgreSQL将数据复制到CSV的功能:

 COPY {query} TO :path WITH ( FORMAT CSV, HEADER :header, DELIMITER :delimiter, QUOTE :quotechar, NULL :na_value, ESCAPE :escapechar, ENCODING :encoding ) 

输出将解析为pandas.DataFrame()或numpy.ndarray()。

由于warp_prism用C编写,因此在解析数据方面具有显着优势。 但与此同时,它也有一个很大的缺点-对数据类型的支持有限。 也就是说,它解析int,float,date和str,但不解析数字。 Odo没有这样的限制。

为了使用,有必要使用sqlalchemy包描述表结构和查询:

 tbl_prices = sa.Table( 'prices', metadata, sa.Column('symbol', sa.String(16)), sa.Column('dt', sa.Date), sa.Column('open', sa.FLOAT), sa.Column('high', sa.FLOAT), sa.Column('low', sa.FLOAT), sa.Column('close', sa.FLOAT), sa.Column('volume', sa.BIGINT), sa.Column('adj', sa.FLOAT), ) query = sa.select(tbl_prices.c).where( tbl_prices.c.symbol.in_(SYMBOLS) ).order_by('symbol', 'dt') 

速度测试:

 %timeit odo(query, pd.DataFrame, bind=engine) # 13.8s %timeit warp_prism.to_dataframe(query, bind=engine) # 8.4s %timeit warp_prism.to_arrays(query, bind=engine) # 8.0s 

warp_prism.to_arrays()-准备一个带有numpy数组的python字典。

ClickHouse可以做什么?


PostgreSQL对每个人都有好处,除了对存储大小的需求以及对大型表配置分片的需求外。 ClickHouse本身可进行分片,紧凑地存储所有内容,并以闪电般的速度工作。 例如,在ClickHouse中大小为〜5Gb的PostgreSQL表可容纳〜1Gb。 在另一篇文章中介绍了使用ClickHouse存储价格。

令我烦恼的是,尽管sqlalchemy具有clickhouse扩展名,但odo并没有帮助。 控制台中对clickhouse速度的记忆使我想到了通过创建单独的进程来访问数据库的想法。 我知道这很长而且很耗资源,但是结果令人称赞。

 sql = 'SELECT days.symbol, days.date, days.open/10000, days.high/10000, days.low/10000, days.close/10000, days.volume FROM days ' \ 'WHERE days.symbol IN (\'{0}\') ORDER BY days.symbol, days.date;'.format("','".join(SYMBOLS)) cmd = 'clickhouse-client --query="{0}"'.format(sql) def ch_pandas(cmd): p = subprocess.Popen([cmd], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) return pd.io.parsers.read_csv(p.stdout, sep="\t", names=['symbol', 'date', 'open', 'high', 'low', 'close', 'volume']) 

结果:

 %timeit ch_pandas(cmd) # 1.6s 

ClickHouse HTTP端口请求


直接访问数据库响应的端口8123时,结果恶化了一点:

 import urllib %timeit pd.io.parsers.read_csv('http://localhost:8123/?{0}'.format(urllib.parse.urlencode({'query': sql})), sep="\t", names=['symbol', 'date', 'open', 'high', 'low', 'close', 'volume']) # 1.9s 

但不是没有美中不足的地方。

与ClickHouse一起飞行


该数据库在大型样本上给人留下了深刻的印象,但对于较小的结果却令人失望。 比odo还差20倍。 但是,这是启动过程或通过HTTP访问时使用额外工具包的成本。

结果:

图片

结论


通过本文,加速Python与数据库之间的交互的追求已经结束。 对于具有标准字段且需要通用访问价格的PostgreSQL,最好的方法是使用Quantopian的warp_prism软件包。 如果您需要存储大量的历史记录以及对大量行的大量请求,那么ClickHouse是理想的选择。

Source: https://habr.com/ru/post/zh-CN416681/


All Articles