实时服务示例中的Q和KDB +功能

Q编程语言KDB +是什么,它们的优缺点是什么,可以在我的上一篇文章和简介中找到。 在本文中,我们在Q上实现了一项服务,该服务将处理传入的数据流,并以“实时”模式每分钟计算各种聚合函数(也就是说,它将设法对所有数据进行计数,直到下一个数据为止)。 Q的主要特征是它是一种向量语言,它使您不仅可以处理单个对象,还可以处理它们的数组,数组数组和其他复杂对象。 Q及其相关的K,J,APL等语言以其简洁而闻名。 通常,可以用几行代码以熟悉的语言(例如Java)来跨越多个屏幕的程序。 这正是我要在本文中演示的内容。



引言


KDB +是一个列数据库,专注于以某种方式(主要是按时间)排序的大量数据。 首先,它用于金融组织-银行,投资基金,保险公司。 Q语言是KDB +的一种内部语言,可让您有效地使用此数据。 Q的意识形态是简洁和高效,而牺牲了清晰度。 事实证明,在任何情况下矢量语言都很难理解,并且录音的简洁性和丰富性使您可以在一个屏幕上看到程序的更大部分,从而最终有助于其理解。

在本文中,我们正在实现一个完善的Q程序,您可能想尝试一下。 为此,您需要Q本身,您可以在kx公司网站www.kx.com上下载免费的32位版本。 如果您有兴趣,可以在同一地方找到有关Q的参考信息,《 为真人而生的书》以及有关此主题的各种文章。

问题陈述


有一个源每25毫秒发送一次数据表。 由于KDB +主要用于财务,因此我们假设这是一个交易表,其中包含以下列:时间(以毫秒为单位的时间),sym(交易所上的公司名称-IBMAAPL ,...),价格(价格)购买股票的方式),大小(交易大小)。 可以任意选择25毫秒的间隔,该间隔不能太小也不能太大。 它的存在意味着到达服务的数据已经被缓冲。 根据当前负载,在服务端实现缓冲(包括动态缓冲)会很容易,但是为简单起见,我们将停留在固定的时间间隔上。

服务应针对符号列中的每个传入字符每分钟考虑一组汇总功能-最高价格,平均价格,总和等。 有用的信息。 为简单起见,我们假设所有函数都可以增量计算,即 要获得一个新值,就足以知道两个数字-旧值和传入值。 例如,函数max,average,sum具有此属性,但中位数函数没有此属性。

我们还假定传入数据流按时间排序。 这将使我们有机会仅在最后一分钟工作。 在实践中,只要有任何最新更新就可以使用当前和之前的分钟就足够了。 为简单起见,我们将不考虑这种情况。

汇总功能


下面列出了必需的聚合函数。 我尽可能多地使用它们来增加服务负载:

  • 高-最高价格-每分钟最高价格。
  • 低-最低价格-每分钟的最低价格。
  • firstPrice-第一价格-每分钟的第一价格。
  • lastPrice-最终价格-每分钟的最后价格。
  • firstSize-第一大小-一分钟内的第一笔交易大小。
  • lastSize-上一交易量-一分钟内的最后交易量。
  • numTrades-计数i-每分钟的交易数。
  • 数量-总和-每分钟的交易总和。
  • pvolume-总价-avgPrice必需的每分钟的总价。
  • 周转率-总价*大小-每分钟的总交易量。
  • avgPrice-pvolume%numTrades-每分钟的平均价格。
  • avgSize-交易量%numTrades-每分钟的平均交易量。
  • vwap-营业额%交易量-每分钟的平均价格,按交易金额加权。
  • cumVolume-总和-整个时间的累计交易量。

立即讨论一个不明显的问题-如何首次以及每隔一分钟初始化这些列。 每次都会将firstPrice类型的某些列初始化为null,但其值未定义。 其他类型的音量必须始终设置为0。仍然有一些列需要组合方法-例如,必须从前一分钟复制cumVolume,并且将第一个设置为0。我们将使用数据类型字典(记录类似)设置所有这些参数:

// list ! list –  , 0n – float null, 0N – long null, `sym –  , `sym1`sym2 –   initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0); aggCols:reverse key[initWith] except `sym`time; //    , reverse   

为了方便起见,我在字典中添加了符号和时间,现在initWith是最后一个汇总表中的最后一行,保留它来设置正确的符号和时间。 您可以使用它向表中添加新行。

创建聚合函数时需要的aggCols。 由于在Q中计算表达式的顺序的特殊性(从右到左),因此必须反转列表。 目标是提供从高到cumVolume的方向的计算,因为某些列依赖于先前的列。

要复制到上一分钟的新分钟的列,为方便起见添加了sym列:

 rollColumns:`sym`cumVolume; 

现在,我们根据应如何更新将列分为几组。 可以区分三种类型:

  1. 电池(体积,周转量,..)-我们必须将输入值添加到上一个。
  2. 对于特殊点(高,低,..)-一分钟内从输入数据中获取第一个值,其余使用该函数进行计数。
  3. 其余的。 始终使用功能进行计数。

为这些类定义变量:

 accumulatorCols:`numTrades`volume`pvolume`turnover; specialCols:`high`low`firstPrice`firstSize; 

计算顺序


我们将分两个阶段更新汇总表。 为了提高效率,我们将首先收缩传入表,以便每个字符和分钟剩余一行。 我们所有功能都是递增的和关联的,这一事实向我们保证,此附加步骤的结果不会改变。 您可以使用select挤压表格:

 select high:max price, low:min price … by sym,time.minute from table 

该方法的缺点是-计算列的集合是预定义的。 幸运的是,在Q中,选择还作为一个函数实现,您可以在其中替换动态创建的参数:

 ?[table;whereClause;byClause;selectClause] 

我不会详细描述参数的格式,在我们的情况下,仅by和select表达式是不平凡的,它们应该是表单列的字典! 因此,压缩函数可以定义如下:

 selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each   map  Q    preprocess:?[;();`sym`time!`sym`time.minute;selExpression]; 

为了清楚起见,我使用了parse函数,该函数将带有Q表达式的字符串转换为可以传递给eval函数的值,并且在select函数中是必需的。 还要注意,预处理被定义为选择函数的投影(即具有部分定义的参数的函数),缺少一个参数(表)。 如果将预处理应用于表,则会得到缩小的表。

第二阶段是更新汇总表。 首先,我们用伪代码编写算法:

 for each sym in inputTable idx: row index in agg table for sym+currentTime; aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high]; aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume]; … 

在Q中,习惯使用map / reduce函数而不是循环。 但是由于Q是向量语言,并且所有运算都可以一次安全地应用于所有符号,因此作为第一近似,我们可以完全不使用循环来进行一次运算,一次执行所有符号:

 idx:calcIdx inputTable; row:aggTable idx; aggTable[idx;`high]: row[`high] | inputTable`high; aggTable[idx;`volume]: row[`volume] + inputTable`volume; … 

但是我们可以走得更远,在Q中,有一个独特而功能强大的运算符-广义赋值运算符。 它允许您使用索引,函数和参数的列表来更改复杂数据结构中的值集。 在我们的例子中,它看起来像这样:

 idx:calcIdx inputTable; rows:aggTable idx; // .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument],     –   .[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)]; 

不幸的是,要分配给表,您需要一个行而不是列的列表,并且必须使用翻转功能将矩阵(列的列表转换为行的列表)进行转置。 对于大表,这是不必要的,因此,我们可以使用map函数(看起来像撇号)将广义分配分别应用于每列:

 .[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)]; 

我们再次使用投影功能。 还要注意,在Q中,创建列表也是一个函数,我们可以使用each(map)函数来调用它以获取列表列表。

为了使计算列的集合不固定,请动态创建上面的表达式。 首先,我们定义用于计算每一列的函数,使用row和inp变量引用汇总和输入数据:

 aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume! ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume"); 

一些列是特殊的;它们的第一个值不应由函数计算。 我们可以确定它是列行[`numTrades]中的第一个-如果它为0,则该值为第一个。 Q具有选择功能-?[布尔列表;列表1;列表2]-根据第一个参数的条件从列表1或2中选择一个值:

 // high -> ?[isFirst;inp`high;row[`high]|inp`high] // @ -         @[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols]; 

在这里,我用函数(用花括号表示)调用了通用分配。 我在第4个参数中传递的当前值(第一个参数)和一个附加参数将传递给它。

另外,我们添加了电池扬声器,因为它们的功能相同:

 // volume -> row[`volume]+inp`volume aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols; 

这是按Q标准进行的通常分配,我只一次分配一个值列表。 最后,创建主要功能:

 // ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high"    ,          // string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high"   . ,'   map[concat] // ";" sv exprs – String from Vector (sv),     “;”  updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; 

使用此表达式,我可以从包含上面引用的表达式的字符串中动态创建一个函数。 结果将如下所示:

 {[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]} 

列的计算顺序是相反的,因为在Q中,计算顺序是从右到左。

现在,我们有两个计算所需的主要功能,剩下的就是添加一些基础架构并且服务已经准备就绪。

最后步骤


我们具有preprocess和updateAgg函数,可以完成所有工作。 但是仍然有必要确保在数分钟内正确过渡并计算聚合索引。 首先,我们定义init函数:

 init:{ tradeAgg:: 0#enlist[initWith]; //    , enlist    ,  0#   0    currTime::00:00; //   0, :: ,      currSyms::`u#`symbol$(); // `u# -    ,     offset::0; //   tradeAgg,     rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; //     roll ,    sym } 

我们还定义了滚动功能,它将更改当前分钟:

 roll:{[tm] if[currTime>tm; :init[]]; //    ,    init rollCache,::offset _ rollColumns#tradeAgg; //   –  roll   aggTable, ,   rollCache offset::count tradeAgg; currSyms::`u#`$(); } 

我们需要一个函数来添加新字符:

 addSyms:{[syms] currSyms,::syms; //     //    sym, time  rollColumns   . //  ^      roll ,     . value flip table     . `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)]; } 

最后,由客户端调用的upd函数(此函数在Q服务中的传统名称)添加数据:

 upd:{[tblName;data] // tblName   ,       tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time updMinute[data] each tm; //      }; updMinute:{[data;tm] if[tm<>currTime; roll tm; currTime::tm]; //  ,   data:select from data where time=tm; //  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; //   updateAgg[`tradeAgg;offset+currSyms?syms;data]; //   .  ?        . }; 

仅此而已。 这是我们所承诺的服务的完整代码,仅几行:

 initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0); aggCols:reverse key[initWith] except `sym`time; rollColumns:`sym`cumVolume; accumulatorCols:`numTrades`volume`pvolume`turnover; specialCols:`high`low`firstPrice`firstSize; selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); preprocess:?[;();`sym`time!`sym`time.minute;selExpression]; aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume"); @[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols]; aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols; updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / ' init:{ tradeAgg::0#enlist[initWith]; currTime::00:00; currSyms::`u#`symbol$(); offset::0; rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; }; roll:{[tm] if[currTime>tm; :init[]]; rollCache,::offset _ rollColumns#tradeAgg; offset::count tradeAgg; currSyms::`u#`$(); }; addSyms:{[syms] currSyms,::syms; `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)]; }; upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data}; updMinute:{[data;tm] if[tm<>currTime; roll tm; currTime::tm]; data:select from data where time=tm; if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; updateAgg[`tradeAgg;offset+currSyms?syms;data]; }; 

测试中


检查服务性能。 为此,请在一个单独的进程中运行它(将代码放入service.q文件中)并调用init函数:

 q service.q –p 5566 q)init[] 

在另一个控制台中,启动第二个Q进程并连接到第一个:

 h:hopen `:host:5566 h:hopen 5566 //      

首先,创建一个字符列表-10,000个,并添加一个函数以创建随机表。 在第二个控制台中:

 syms:`IBM`AAPL`GOOG,-9997?`8 rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)} 

我在字符列表中添加了三个实字符,以使在表中查找它们更加方便。 rnd函数创建一个具有n行的随机表,其中时间从t到t + 25毫秒不等。

现在,您可以尝试将数据发送到服务(添加前十个小时):

 {h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10 

您可以在服务中检查表是否已更新:

 \c 25 200 select from tradeAgg where sym=`AAPL -20#select from tradeAgg where sym=`AAPL 

结果:

 sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume --|--|--|--|--|-------------------------------- AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888 AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895 AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909 AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915 AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919 

现在,我们将进行负载测试,以找出服务每分钟可以处理多少数据。 让我提醒您,我们将更新间隔设置为25毫秒。 因此,每次更新服务(平均)应至少适合20毫秒,以使用户有时间请求数据。 在第二个过程中输入以下内容:

 tm:10:00:00.000 stressTest:{[n] 1 string[tm]," "; times,::h ({st:.zT; upd[`trade;x]; .zT-st};rnd[n;tm]); tm+:25} start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)} 

4800是两分钟。 您可以尝试每25毫秒首先开始1000行:

 start 1000 

就我而言,每次更新的结果大约是几毫秒。 因此,我将立即将行数增加到10.000:

 start 10000 

结果:

 min| 00:00:00.004 avg| 9.191458 med| 9f max| 00:00:00.030 

同样,没什么特别的,但这是每分钟2400万行,每秒40万行。 在25分钟以上的时间内,更新速度仅降低了5倍,这显然是在更改分钟数时进行的。 增加到100,000:

 start 100000 

结果:

 min| 00:00:00.013 avg| 25.11083 med| 24f max| 00:00:00.108 q)sum times 00:02:00.532 

如您所见,该服务勉强可以应付,但仍然可以维持运转。 这种数据量(每分钟2.4亿行)非常大,在这种情况下,通常会运行服务的多个克隆(甚至数十个克隆),每个克隆仅处理部分字符。 但是,对于解释语言来说,其结果令人印象深刻,该语言主要集中在数据存储上。

可能会出现一个问题,为什么时间会随着每次更新的大小非线性增长。 原因是压缩函数实际上是C函数,其工作效率比updateAgg高得多。 从某个更新大小(大约10.000)开始,updateAgg达到上限,然后其执行时间不取决于更新的大小。 由于预备步骤Q,该服务能够消化此类数据量。 这强调了在处理大数据时选择正确算法的重要性。 另一点是在内存中正确存储数据。 如果数据未存储在列中或未按时间排序,那么我们将熟悉诸如TLB高速缓存未命中-处理器地址高速缓存中不存在内存页面地址的情况。 在出现故障的情况下,查找该地址大约需要30倍的时间,在数据分散的情况下,查找速度会降低数倍。

结论


在本文中,我展示了KDB +和Q数据库不仅适合存储大数据并可以通过选择轻松访问它们,而且还适用于创建即使在一个Q过程中也可以消化数亿行/千兆字节数据的数据处理服务。 。 Q语言本身具有矢量性质,SQL方言的内置解释器以及非常成功的库函数集,因此它可以非常简短有效地实现与数据处理有关的算法。

我将注意到,以上只是Q功能的一部分,它还具有其他独特功能。 例如,极其简单的IPC协议消除了各个Q进程之间的边界,使您可以将数百个这些进程组合到一个网络中,该网络可以位于世界各地的数十台服务器上。

Source: https://habr.com/ru/post/zh-CN470596/


All Articles