Características de Q y KDB + en el ejemplo de un servicio en tiempo real

Qué es KDB +, el lenguaje de programación Q, cuáles son sus fortalezas y debilidades, se pueden encontrar en mi artículo anterior y brevemente en la introducción. En el artículo, implementamos un servicio en Q que procesará el flujo de datos entrantes y calculará por minuto varias funciones de agregación en el modo "en tiempo real" (es decir, logrará contar todo hasta el siguiente dato). La característica principal de Q es que es un lenguaje vectorial que le permite operar no con objetos individuales, sino con sus matrices, matrices de matrices y otros objetos complejos. Idiomas como Q y sus K, J, APL relacionados son famosos por su brevedad. A menudo, un programa que abarca varias pantallas de código en un lenguaje familiar como Java puede escribirse en ellas en varias líneas. Esto es exactamente lo que quiero demostrar en este artículo.



Introduccion


KDB + es una base de datos de columnas enfocada en grandes volúmenes de datos, ordenados de cierta manera (principalmente por tiempo). Se utiliza, en primer lugar, en organizaciones financieras: bancos, fondos de inversión, compañías de seguros. El lenguaje Q es un lenguaje interno de KDB + que le permite trabajar eficazmente con estos datos. La ideología de Q es brevedad y eficiencia, mientras que se sacrifica la claridad. Esto se justifica por el hecho de que, en cualquier caso, el lenguaje vectorial será difícil de percibir, y la brevedad y riqueza de la grabación le permite ver una parte mucho más grande del programa en una pantalla, lo que en última instancia facilita su comprensión.

En este artículo, estamos implementando un programa Q completo y es posible que desee probarlo. Para hacer esto, necesitará la Q real. Puede descargar la versión gratuita de 32 bits en el sitio web de la compañía kx: www.kx.com . En el mismo lugar, si está interesado, encontrará información de referencia sobre Q, el libro Q For Mortals y varios artículos sobre este tema.

Declaración del problema.


Hay una fuente que envía una tabla de datos cada 25 milisegundos. Dado que KDB + se usa principalmente en finanzas, suponemos que se trata de una tabla de operaciones en la que existen las siguientes columnas: tiempo (tiempo en milisegundos), sym (nombre de la empresa en el intercambio - IBM , AAPL , ...), precio (precio por el cual se compraron las acciones), tamaño (tamaño de la transacción). Se elige arbitrariamente un intervalo de 25 milisegundos, no es demasiado pequeño ni demasiado grande. Su presencia significa que los datos que llegan al servicio ya están almacenados en el búfer. Sería fácil implementar el almacenamiento en búfer en el lado del servicio, incluido el almacenamiento en búfer dinámico, dependiendo de la carga actual, pero por simplicidad nos detenemos en un intervalo fijo.

El servicio debe considerar por minuto para cada carácter entrante de la columna sym un conjunto de funciones de agregación: precio máximo, precio promedio, tamaño de suma, etc. información útil Para simplificar, asumimos que todas las funciones se pueden calcular de forma incremental, es decir Para obtener un nuevo valor, es suficiente conocer dos números: el antiguo y el valor entrante. Por ejemplo, las funciones max, average, sum tienen esta propiedad, pero la función mediana no.

También asumimos que el flujo de datos entrantes está ordenado por tiempo. Esto nos dará la oportunidad de trabajar solo con el último minuto. En la práctica, es suficiente para poder trabajar con los minutos actuales y anteriores en caso de que las actualizaciones lleguen tarde. Por simplicidad, no consideraremos este caso.

Funciones agregadas


A continuación se enumeran las funciones agregadas requeridas. Los tomé lo más posible para aumentar la carga en el servicio:

  • alto - precio máximo - precio máximo por minuto.
  • bajo - precio mínimo - el precio mínimo por minuto.
  • firstPrice - primer precio - el primer precio por minuto.
  • lastPrice - last price - el último precio por minuto.
  • firstSize - primer tamaño - el primer tamaño de oferta en un minuto.
  • lastSize - last size - el último tamaño de oferta en un minuto.
  • numTrades - cuenta i - el número de transacciones por minuto.
  • volumen - tamaño de suma - la suma de los tamaños de transacción por minuto.
  • pvolume - precio de suma - la suma de precios por minuto, necesaria para avgPrice.
  • volumen de negocios - precio total * tamaño - volumen total de transacciones por minuto.
  • avgPrice - pvolume% numTrades - precio promedio por minuto.
  • avgSize - volumen% numTrades - tamaño promedio de oferta por minuto.
  • vwap - volumen de volumen de negocios% - el precio promedio por minuto ponderado por el tamaño de la transacción.
  • cumVolume - volumen de suma - tamaño de transacción acumulado para todo el tiempo.

Discuta inmediatamente un punto no obvio: cómo inicializar estas columnas por primera vez y para cada minuto siguiente. Algunas columnas del tipo firstPrice deben inicializarse con nulo cada vez, su valor no está definido. Otros tipos de volumen siempre deben establecerse en 0. Todavía hay columnas que requieren un enfoque combinado; por ejemplo, cumVolume debe copiarse del minuto anterior y para el primer conjunto en 0. Configuraremos todos estos parámetros usando el diccionario de tipos de datos (análogo del registro):

// list ! list –  , 0n – float null, 0N – long null, `sym –  , `sym1`sym2 –   initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0); aggCols:reverse key[initWith] except `sym`time; //    , reverse   

Agregué sym y time al diccionario por conveniencia, ahora initWith es una línea terminada de la tabla agregada final, donde queda establecer el sym y el tiempo correctos. Puede usarlo para agregar nuevas filas a la tabla.

aggCols que necesitamos al crear una función agregada. La lista debe invertirse debido a las peculiaridades del orden en que se calculan las expresiones en Q (de derecha a izquierda). El objetivo es proporcionar un cálculo en la dirección de alto a cumVolume, ya que algunas columnas dependen de las anteriores.

Columnas que se copiarán a un nuevo minuto del anterior, columna de sym agregada para mayor comodidad:

 rollColumns:`sym`cumVolume; 

Ahora dividimos las columnas en grupos según cómo deben actualizarse. Se pueden distinguir tres tipos:

  1. Baterías (volumen, rotación, ...): debemos agregar el valor de entrada al anterior.
  2. Con un punto especial (alto, bajo, ..): el primer valor en un minuto se toma de los datos de entrada, el resto se cuenta utilizando la función.
  3. El resto Siempre se cuenta con una función.

Defina variables para estas clases:

 accumulatorCols:`numTrades`volume`pvolume`turnover; specialCols:`high`low`firstPrice`firstSize; 

Orden de cálculo


Actualizaremos la tabla agregada en dos etapas. Para mayor eficiencia, primero reduciremos la tabla entrante para que quede una fila para cada carácter y minuto. El hecho de que todas nuestras funciones sean incrementales y asociativas nos garantiza que el resultado de este paso adicional no cambiará. Puede exprimir la tabla usando select:

 select high:max price, low:min price … by sym,time.minute from table 

Este método tiene un signo negativo: el conjunto de columnas calculadas está predefinido. Afortunadamente, en Q, la selección también se implementa como una función donde puede sustituir argumentos creados dinámicamente:

 ?[table;whereClause;byClause;selectClause] 

¡No describiré en detalle el formato de los argumentos, en nuestro caso solo las expresiones by y select no son triviales y deberían ser diccionarios de las columnas de formulario! Expresiones. Por lo tanto, la función de constricción se puede definir como sigue:

 selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each   map  Q    preprocess:?[;();`sym`time!`sym`time.minute;selExpression]; 

Para mayor claridad, utilicé la función de análisis, que convierte una cadena con una expresión Q en un valor que se puede pasar a la función eval y que se requiere en la selección de funciones. También tenga en cuenta que el preproceso se define como una proyección (es decir, una función con argumentos parcialmente definidos) de la función de selección, falta un argumento (tabla). Si aplicamos el preproceso a una tabla, obtenemos una tabla reducida.

La segunda etapa es actualizar la tabla agregada. Primero, escribimos el algoritmo en pseudocódigo:

 for each sym in inputTable idx: row index in agg table for sym+currentTime; aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high]; aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume]; … 

En Q, en lugar de bucles, se acostumbra usar las funciones map / reduce. Pero dado que Q es un lenguaje vectorial y todas las operaciones que podemos aplicar de manera segura a todos los símbolos a la vez, entonces, como primera aproximación, podemos prescindir de un ciclo, haciendo operaciones con todos los símbolos a la vez:

 idx:calcIdx inputTable; row:aggTable idx; aggTable[idx;`high]: row[`high] | inputTable`high; aggTable[idx;`volume]: row[`volume] + inputTable`volume; … 

Pero podemos ir más allá, en Q hay un operador único y extremadamente poderoso: el operador de asignación generalizada. Le permite cambiar el conjunto de valores en una estructura de datos compleja utilizando una lista de índices, funciones y argumentos. En nuestro caso, se ve así:

 idx:calcIdx inputTable; rows:aggTable idx; // .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument],     –   .[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)]; 

Desafortunadamente, para asignar a una tabla necesita una lista de filas, no columnas, y debe transponer la matriz (lista de columnas en una lista de filas) utilizando la función de volteo. Para una tabla grande, esto es innecesario, por lo que aplicamos la asignación generalizada a cada columna por separado, utilizando la función de mapa (que parece un apóstrofe):

 .[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)]; 

Nuevamente usamos la función de proyección. También tenga en cuenta que en Q, crear una lista también es una función y podemos llamarla usando la función each (map) para obtener una lista de listas.

Para que el conjunto de columnas calculadas no sea fijo, cree la expresión anterior dinámicamente. Primero, definimos las funciones para calcular cada columna, usando las variables fila e inp para hacer referencia a datos agregados y de entrada:

 aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume! ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume"); 

Algunas columnas son especiales; su primer valor no debe ser calculado por una función. Podemos determinar que es el primero en la fila de la columna [`numTrades]; si tiene 0, entonces el valor es el primero. Q tiene una función de selección -? [Lista booleana; lista1; lista2] - que selecciona un valor de la lista 1 o 2 dependiendo de la condición en el primer argumento:

 // high -> ?[isFirst;inp`high;row[`high]|inp`high] // @ -         @[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols]; 

Aquí llamé una asignación genérica con mi función (expresión entre llaves). Se le pasa el valor actual (el primer argumento) y un argumento adicional, que paso en el cuarto parámetro.

Por separado, agregamos altavoces de batería, porque para ellos la función es la misma:

 // volume -> row[`volume]+inp`volume aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols; 

Esta es una asignación habitual según los estándares de Q, solo asigno una lista de valores a la vez. Finalmente, cree la función principal:

 // ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high"    ,          // string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high"   . ,'   map[concat] // ";" sv exprs – String from Vector (sv),     “;”  updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; 

Con esta expresión, creo dinámicamente una función a partir de una cadena que contiene la expresión que cité anteriormente. El resultado se verá así:

 {[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]} 

El orden de cálculo de las columnas se invierte, ya que en Q el orden de cálculo es de derecha a izquierda.

Ahora tenemos dos funciones principales necesarias para los cálculos, queda agregar una pequeña infraestructura y el servicio está listo.

Pasos finales


Tenemos funciones de preproceso y actualizaciónAgg que hacen todo el trabajo. Pero aún es necesario garantizar la transición correcta en minutos y calcular los índices de agregación. Primero definimos la función init:

 init:{ tradeAgg:: 0#enlist[initWith]; //    , enlist    ,  0#   0    currTime::00:00; //   0, :: ,      currSyms::`u#`symbol$(); // `u# -    ,     offset::0; //   tradeAgg,     rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; //     roll ,    sym } 

También definimos la función de desplazamiento, que cambiará el minuto actual:

 roll:{[tm] if[currTime>tm; :init[]]; //    ,    init rollCache,::offset _ rollColumns#tradeAgg; //   –  roll   aggTable, ,   rollCache offset::count tradeAgg; currSyms::`u#`$(); } 

Necesitamos una función para agregar nuevos personajes:

 addSyms:{[syms] currSyms,::syms; //     //    sym, time  rollColumns   . //  ^      roll ,     . value flip table     . `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)]; } 

Y finalmente, la función upd (el nombre tradicional de esta función para los servicios Q), que el cliente llama, para agregar datos:

 upd:{[tblName;data] // tblName   ,       tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time updMinute[data] each tm; //      }; updMinute:{[data;tm] if[tm<>currTime; roll tm; currTime::tm]; //  ,   data:select from data where time=tm; //  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; //   updateAgg[`tradeAgg;offset+currSyms?syms;data]; //   .  ?        . }; 

Eso es todo Aquí está el código completo de nuestro servicio, según lo prometido, solo unas pocas líneas:

 initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0); aggCols:reverse key[initWith] except `sym`time; rollColumns:`sym`cumVolume; accumulatorCols:`numTrades`volume`pvolume`turnover; specialCols:`high`low`firstPrice`firstSize; selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); preprocess:?[;();`sym`time!`sym`time.minute;selExpression]; aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume"); @[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols]; aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols; updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / ' init:{ tradeAgg::0#enlist[initWith]; currTime::00:00; currSyms::`u#`symbol$(); offset::0; rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; }; roll:{[tm] if[currTime>tm; :init[]]; rollCache,::offset _ rollColumns#tradeAgg; offset::count tradeAgg; currSyms::`u#`$(); }; addSyms:{[syms] currSyms,::syms; `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)]; }; upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data}; updMinute:{[data;tm] if[tm<>currTime; roll tm; currTime::tm]; data:select from data where time=tm; if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; updateAgg[`tradeAgg;offset+currSyms?syms;data]; }; 

Prueba


Verifique el desempeño del servicio. Para hacer esto, ejecútelo en un proceso separado (coloque el código en el archivo service.q) y llame a la función init:

 q service.q –p 5566 q)init[] 

En otra consola, inicie el segundo proceso Q y conéctese al primero:

 h:hopen `:host:5566 h:hopen 5566 //      

Primero, cree una lista de caracteres: 10,000 piezas y agregue una función para crear una tabla aleatoria. En la segunda consola:

 syms:`IBM`AAPL`GOOG,-9997?`8 rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)} 

Agregué tres caracteres reales a la lista de caracteres para que sea más conveniente buscarlos en la tabla. La función rnd crea una tabla aleatoria con n filas, donde el tiempo varía de t a t + 25 milisegundos.

Ahora puede intentar enviar datos al servicio (agregue las primeras diez horas):

 {h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10 

Puede verificar en el servicio que la tabla ha sido actualizada:

 \c 25 200 select from tradeAgg where sym=`AAPL -20#select from tradeAgg where sym=`AAPL 

Resultado:

 sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume --|--|--|--|--|-------------------------------- AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888 AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895 AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909 AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915 AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919 

Ahora realizaremos pruebas de carga para averiguar cuántos datos puede procesar el servicio por minuto. Permítame recordarle que establecemos el intervalo para las actualizaciones en 25 milisegundos. En consecuencia, un servicio debería (en promedio) caber en al menos 20 milisegundos por actualización para dar a los usuarios tiempo para solicitar datos. Ingrese lo siguiente en el segundo proceso:

 tm:10:00:00.000 stressTest:{[n] 1 string[tm]," "; times,::h ({st:.zT; upd[`trade;x]; .zT-st};rnd[n;tm]); tm+:25} start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)} 

4800 son dos minutos. Puede intentar comenzar primero por 1000 líneas cada 25 milisegundos:

 start 1000 

En mi caso, el resultado es alrededor de un par de milisegundos por actualización. Así que aumentaré inmediatamente el número de filas a 10.000:

 start 10000 

Resultado:

 min| 00:00:00.004 avg| 9.191458 med| 9f max| 00:00:00.030 

De nuevo, nada especial, pero esto es 24 millones de líneas por minuto, 400 mil por segundo. Durante más de 25 milisegundos, la actualización se ralentizó solo 5 veces, aparentemente al cambiar el minuto. Incremento a 100,000:

 start 100000 

Resultado:

 min| 00:00:00.013 avg| 25.11083 med| 24f max| 00:00:00.108 q)sum times 00:02:00.532 

Como puede ver, el servicio apenas se las arregla, pero sin embargo logra mantenerse a flote. Esta cantidad de datos (240 millones de líneas por minuto) es extremadamente grande, en tales casos se acostumbra ejecutar varios clones (o incluso docenas de clones) del servicio, cada uno de los cuales procesa solo una parte de los caracteres. Sin embargo, el resultado es impresionante para el lenguaje interpretado, que se centra principalmente en el almacenamiento de datos.

Puede surgir la pregunta de por qué el tiempo crece de forma no lineal con el tamaño de cada actualización. La razón es que la función de compresión es en realidad una función C que funciona de manera mucho más eficiente que updateAgg. Comenzando con algún tamaño de actualización (alrededor de 10.000), updateAgg alcanza su límite y luego su tiempo de ejecución no depende del tamaño de la actualización. Debido al paso preliminar Q, el servicio puede digerir dichos volúmenes de datos. Esto enfatiza cuán importante es cuando se trabaja con big data para elegir el algoritmo correcto. Otro punto es el almacenamiento correcto de datos en la memoria. Si los datos no se almacenaron en columnas o no se ordenaron por tiempo, entonces nos familiarizaríamos con la falta de caché TLB: la ausencia de una dirección de página de memoria en el caché de direcciones del procesador. Encontrar la dirección tarda aproximadamente 30 veces más en caso de falla y en el caso de datos dispersos puede ralentizar el servicio varias veces.

Conclusión


En este artículo, mostré que las bases de datos KDB + y Q son adecuadas no solo para almacenar grandes datos y un fácil acceso a ellas mediante select, sino también para crear servicios de procesamiento de datos que pueden digerir cientos de millones de filas / gigabytes de datos incluso en un solo proceso Q . El lenguaje Q en sí mismo permite implementar algoritmos extremadamente breves y eficientes relacionados con el procesamiento de datos debido a su naturaleza vectorial, un intérprete incorporado del dialecto SQL y un conjunto muy exitoso de funciones de biblioteca.

Notaré que lo anterior es solo una parte de las capacidades de Q, tiene otras características únicas. Por ejemplo, un protocolo IPC extremadamente simple que borra la frontera entre procesos Q separados y le permite combinar cientos de estos procesos en una sola red, que puede ubicarse en docenas de servidores en diferentes partes del mundo.

Source: https://habr.com/ru/post/470596/


All Articles