Recursos do Q e KDB + no exemplo de um serviço em tempo real

O que é o KDB +, a linguagem de programação Q, quais são seus pontos fortes e fracos, podem ser encontrados no meu artigo anterior e brevemente na introdução. No artigo, implementamos um serviço em Q que processará o fluxo de dados recebidos e calculará por minuto várias funções agregadas no modo "tempo real" (ou seja, haverá tempo para calcular tudo até a próxima parte dos dados). A principal característica do Q é que é uma linguagem vetorial que permite operar não com objetos únicos, mas com suas matrizes, matrizes de matrizes e outros objetos complexos. Idiomas como Q e seus relacionados K, J, APL são famosos por sua brevidade. Geralmente, um programa que abrange várias telas de código em uma linguagem familiar como Java pode ser escrito nelas em várias linhas. É exatamente isso que quero demonstrar neste artigo.



1. Introdução


O KDB + é um banco de dados de coluna focado em volumes muito grandes de dados, classificados de uma certa maneira (principalmente por tempo). É usado, antes de tudo, em organizações financeiras - bancos, fundos de investimento, companhias de seguros. O idioma Q é um idioma interno do KDB + que permite trabalhar efetivamente com esses dados. A ideologia de Q é concisão e eficiência, enquanto a clareza é sacrificada. Isso é justificado pelo fato de que, em qualquer caso, a linguagem do vetor será difícil de perceber, e a brevidade e a riqueza da gravação permitem que você veja uma parte muito maior do programa em uma tela, o que facilita a compreensão.

Neste artigo, estamos implementando um programa Q completo e você pode experimentá-lo. Para fazer isso, você precisará do Q. atual. É possível fazer o download da versão gratuita de 32 bits no site da empresa kx - www.kx.com . Lá, se você estiver interessado, encontrará informações de referência sobre Q, o livro Q For Mortals e vários artigos sobre esse tópico.

Declaração do problema


Há uma fonte que envia uma tabela de dados a cada 25 milissegundos. Como o KDB + é usado principalmente em finanças, assumimos que esta é uma tabela de negociações na qual existem as seguintes colunas: tempo (tempo em milissegundos), sym (nome da empresa na bolsa - IBM , AAPL , ...), preço (preço pelo qual as ações foram compradas), tamanho (tamanho da transação). Um intervalo de 25 milissegundos é escolhido arbitrariamente, não é muito pequeno nem muito grande. Sua presença significa que os dados que chegam ao serviço já estão armazenados em buffer. Seria fácil implementar o buffer no lado do serviço, incluindo buffer dinâmico, dependendo da carga atual, mas, por simplicidade, permanecemos em um intervalo fixo.

O serviço deve considerar por minuto, para cada caractere recebido da coluna sym, um conjunto de funções agregadas - preço máximo, preço médio, tamanho da soma etc. informação útil. Para simplificar, assumimos que todas as funções podem ser calculadas incrementalmente, ou seja, para obter um novo valor, basta conhecer dois números - o valor antigo e o valor recebido. Por exemplo, as funções max, average, sum têm essa propriedade, mas a função mediana não.

Também assumimos que o fluxo de dados recebidos é ordenado por tempo. Isso nos dará a oportunidade de trabalhar apenas com o último minuto. Na prática, basta trabalhar com os minutos atuais e anteriores, caso haja atraso nas atualizações. Por simplicidade, não consideraremos esse caso.

Funções agregadas


Listadas abaixo estão as funções agregadas necessárias. Levei o máximo possível para aumentar a carga no serviço:

  • alto - preço máximo - preço máximo por minuto.
  • baixo - preço mínimo - o preço mínimo por minuto.
  • firstPrice - primeiro preço - o primeiro preço por minuto.
  • lastPrice - last price - o último preço por minuto.
  • firstSize - primeiro tamanho - o primeiro tamanho da transação em um minuto.
  • lastSize - last size - o último tamanho da oferta em um minuto.
  • numTrades - count i - o número de transações por minuto.
  • volume - tamanho da soma - a soma dos tamanhos de transação por minuto.
  • pvolume - sum price - a soma dos preços por minuto, necessária para avgPrice.
  • volume de negócios - preço soma * tamanho - volume total de transações por minuto.
  • avgPrice - pvolume% numTrades - preço médio por minuto.
  • avgSize - volume% numTrades - tamanho médio da negociação por minuto.
  • vwap - volume% de rotatividade - o preço médio por minuto ponderado pelo tamanho da transação.
  • cumVolume - volume da soma - tamanho da transação acumulada durante todo o tempo.

Discuta imediatamente um ponto não óbvio - como inicializar essas colunas pela primeira vez e a cada minuto seguinte. Algumas colunas do tipo firstPrice precisam ser inicializadas com nulo sempre que seu valor não é definido. Outros tipos de volume sempre devem ser definidos como 0. Ainda existem colunas que exigem uma abordagem combinada - por exemplo, cumVolume deve ser copiado do minuto anterior e, para o primeiro conjunto, como 0. Definiremos todos esses parâmetros usando o dicionário de tipos de dados (análogo do registro):

// list ! list –  , 0n – float null, 0N – long null, `sym –  , `sym1`sym2 –   initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0); aggCols:reverse key[initWith] except `sym`time; //    , reverse   

Adicionei sym e time ao dicionário por conveniência, agora initWith é uma linha final da tabela agregada final, onde resta definir o sym e time corretos. Você pode usá-lo para adicionar novas linhas à tabela.

aggCols que precisamos ao criar uma função agregada. A lista precisa ser invertida devido às peculiaridades da ordem em que as expressões são calculadas em Q (da direita para a esquerda). O objetivo é fornecer computação na direção de alto para cumVolume, pois algumas colunas dependem das anteriores.

Colunas a serem copiadas para um novo minuto do anterior, coluna sym adicionada por conveniência:

 rollColumns:`sym`cumVolume; 

Agora, dividimos as colunas em grupos de acordo com a forma como elas devem ser atualizadas. Três tipos podem ser distinguidos:

  1. Baterias (volume, rotatividade, etc.) - devemos adicionar o valor de entrada ao anterior.
  2. Com um ponto especial (alto, baixo, ..) - o primeiro valor em um minuto é obtido dos dados de entrada, o restante é contado usando a função.
  3. O resto. Sempre contado usando uma função.

Defina variáveis ​​para estas classes:

 accumulatorCols:`numTrades`volume`pvolume`turnover; specialCols:`high`low`firstPrice`firstSize; 

Ordem de cálculo


Atualizaremos a tabela agregada em dois estágios. Por questões de eficiência, primeiro reduziremos a tabela de entrada para que haja uma linha restante para cada caractere e minuto. O fato de todas as nossas funções serem incrementais e associativas nos garante que o resultado dessa etapa adicional não será alterado. Você pode espremer a tabela usando o select:

 select high:max price, low:min price … by sym,time.minute from table 

Este método possui um sinal de menos - o conjunto de colunas calculadas é predefinido. Felizmente, em Q, a seleção também é implementada como uma função em que você pode substituir argumentos criados dinamicamente:

 ?[table;whereClause;byClause;selectClause] 

Não descreverei detalhadamente o formato dos argumentos; no nosso caso, apenas as expressões e select não são triviais e devem ser dicionários das colunas do formulário! Expressions. Assim, a função restritiva pode ser definida da seguinte maneira:

 selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each   map  Q    preprocess:?[;();`sym`time!`sym`time.minute;selExpression]; 

Para maior clareza, usei a função de análise, que transforma uma string com uma expressão Q em um valor que pode ser passado para a função eval e que é necessário na seleção da função. Observe também que o pré-processo é definido como uma projeção (ou seja, uma função com argumentos parcialmente definidos) da função de seleção, um argumento (tabela) está ausente. Se aplicarmos o pré-processo a uma tabela, obteremos uma tabela reduzida.

O segundo estágio é atualizar a tabela agregada. Primeiro, escrevemos o algoritmo no pseudocódigo:

 for each sym in inputTable idx: row index in agg table for sym+currentTime; aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high]; aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume]; … 

Em Q, em vez de loops, é habitual usar funções de mapa / redução. Mas como Q é uma linguagem vetorial e todas as operações, podemos aplicar com segurança a todos os símbolos de uma só vez; então, como primeira aproximação, podemos fazer sem um ciclo, realizando operações com todos os símbolos de uma só vez:

 idx:calcIdx inputTable; row:aggTable idx; aggTable[idx;`high]: row[`high] | inputTable`high; aggTable[idx;`volume]: row[`volume] + inputTable`volume; … 

Mas podemos ir além: em Q, existe um operador único e extremamente poderoso - o operador de atribuição generalizada. Permite alterar o conjunto de valores em uma estrutura de dados complexa usando uma lista de índices, funções e argumentos. No nosso caso, fica assim:

 idx:calcIdx inputTable; rows:aggTable idx; // .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument],     –   .[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)]; 

Infelizmente, para atribuir a uma tabela, você precisa de uma lista de linhas, não de colunas, e precisa transpor a matriz (lista de colunas para uma lista de linhas) usando a função flip. Para uma tabela grande, isso é desnecessário; portanto, em vez disso, aplicamos a atribuição generalizada a cada coluna separadamente, usando a função map (que se parece com um apóstrofo):

 .[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)]; 

Novamente usamos a função de projeção. Observe também que em Q, criar uma lista também é uma função e podemos chamá-la usando a função each (map) para obter uma lista de listas.

Para que o conjunto de colunas calculadas não seja fixo, crie a expressão acima dinamicamente. Primeiro, definimos as funções para calcular cada coluna, usando as variáveis ​​row e inp para referenciar dados agregados e de entrada:

 aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume! ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume"); 

Algumas colunas são especiais; seu primeiro valor não deve ser calculado por uma função. Podemos determinar que é o primeiro na linha da coluna [`numTrades] - se tiver 0, o valor será o primeiro. Q possui uma função de seleção -? [Lista booleana; lista1; lista2] - que seleciona um valor da lista 1 ou 2, dependendo da condição no primeiro argumento:

 // high -> ?[isFirst;inp`high;row[`high]|inp`high] // @ -         @[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols]; 

Aqui chamei uma atribuição genérica com a minha função (expressão entre chaves). O valor atual (o primeiro argumento) e um argumento adicional, que passo no 4º parâmetro, são passados ​​para ele.

Separadamente, adicionamos alto-falantes da bateria, porque para eles a função é a mesma:

 // volume -> row[`volume]+inp`volume aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols; 

Essa é uma tarefa usual pelos padrões de Q, só atribuo uma lista de valores de uma só vez. Por fim, crie a função principal:

 // ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high"    ,          // string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high"   . ,'   map[concat] // ";" sv exprs – String from Vector (sv),     “;”  updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; 

Com essa expressão, crio dinamicamente uma função a partir de uma sequência que contém a expressão que citei acima. O resultado será semelhante a este:

 {[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]} 

A ordem de cálculo das colunas é invertida, pois em Q a ordem de cálculo é da direita para a esquerda.

Agora, temos duas funções principais necessárias para os cálculos, resta adicionar um pouco de infraestrutura e o serviço está pronto.

Etapas finais


Temos funções de pré-processo e updateAgg que fazem todo o trabalho. Mas ainda é necessário garantir a transição correta em minutos e calcular os índices de agregação. Primeiro, definimos a função init:

 init:{ tradeAgg:: 0#enlist[initWith]; //    , enlist    ,  0#   0    currTime::00:00; //   0, :: ,      currSyms::`u#`symbol$(); // `u# -    ,     offset::0; //   tradeAgg,     rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; //     roll ,    sym } 

Também definimos a função roll, que mudará o minuto atual:

 roll:{[tm] if[currTime>tm; :init[]]; //    ,    init rollCache,::offset _ rollColumns#tradeAgg; //   –  roll   aggTable, ,   rollCache offset::count tradeAgg; currSyms::`u#`$(); } 

Precisamos de uma função para adicionar novos caracteres:

 addSyms:{[syms] currSyms,::syms; //     //    sym, time  rollColumns   . //  ^      roll ,     . value flip table     . `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)]; } 

E, finalmente, a função upd (o nome tradicional dessa função para os serviços Q), chamada pelo cliente, para adicionar dados:

 upd:{[tblName;data] // tblName   ,       tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time updMinute[data] each tm; //      }; updMinute:{[data;tm] if[tm<>currTime; roll tm; currTime::tm]; //  ,   data:select from data where time=tm; //  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; //   updateAgg[`tradeAgg;offset+currSyms?syms;data]; //   .  ?        . }; 

Isso é tudo. Aqui está o código completo do nosso serviço, conforme prometido, apenas algumas linhas:

 initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0); aggCols:reverse key[initWith] except `sym`time; rollColumns:`sym`cumVolume; accumulatorCols:`numTrades`volume`pvolume`turnover; specialCols:`high`low`firstPrice`firstSize; selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); preprocess:?[;();`sym`time!`sym`time.minute;selExpression]; aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume"); @[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols]; aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols; updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / ' init:{ tradeAgg::0#enlist[initWith]; currTime::00:00; currSyms::`u#`symbol$(); offset::0; rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; }; roll:{[tm] if[currTime>tm; :init[]]; rollCache,::offset _ rollColumns#tradeAgg; offset::count tradeAgg; currSyms::`u#`$(); }; addSyms:{[syms] currSyms,::syms; `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)]; }; upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data}; updMinute:{[data;tm] if[tm<>currTime; roll tm; currTime::tm]; data:select from data where time=tm; if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; updateAgg[`tradeAgg;offset+currSyms?syms;data]; }; 

Teste


Verifique o desempenho do serviço. Para fazer isso, execute-o em um processo separado (coloque o código no arquivo service.q) e chame a função init:

 q service.q –p 5566 q)init[] 

Em outro console, inicie o segundo processo Q e conecte-se ao primeiro:

 h:hopen `:host:5566 h:hopen 5566 //      

Primeiro, crie uma lista de caracteres - 10.000 peças e adicione uma função para criar uma tabela aleatória. No segundo console:

 syms:`IBM`AAPL`GOOG,-9997?`8 rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)} 

Adicionei três caracteres reais à lista de caracteres para torná-lo mais conveniente para procurá-los na tabela. A função rnd cria uma tabela aleatória com n linhas, onde o tempo varia de t a t + 25 milissegundos.

Agora você pode tentar enviar dados para o serviço (adicione as primeiras dez horas):

 {h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10 

Você pode verificar no serviço se a tabela foi atualizada:

 \c 25 200 select from tradeAgg where sym=`AAPL -20#select from tradeAgg where sym=`AAPL 

Resultado:

 sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume --|--|--|--|--|-------------------------------- AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888 AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895 AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909 AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915 AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919 

Agora, realizaremos testes de carga para descobrir quantos dados o serviço pode processar por minuto. Deixe-me lembrá-lo de que definimos o intervalo de atualizações para 25 milissegundos. Assim, um serviço deve (em média) caber em pelo menos 20 milissegundos por atualização para dar aos usuários tempo para solicitar dados. Digite o seguinte no segundo processo:

 tm:10:00:00.000 stressTest:{[n] 1 string[tm]," "; times,::h ({st:.zT; upd[`trade;x]; .zT-st};rnd[n;tm]); tm+:25} start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)} 

4800 é de dois minutos. Você pode tentar começar primeiro por 1000 linhas a cada 25 milissegundos:

 start 1000 

No meu caso, o resultado é de alguns milissegundos por atualização. Então, aumentarei imediatamente o número de linhas para 10.000:

 start 10000 

Resultado:

 min| 00:00:00.004 avg| 9.191458 med| 9f max| 00:00:00.030 

Novamente, nada de especial, mas são 24 milhões de linhas por minuto, 400 mil por segundo. Por mais de 25 milissegundos, a atualização diminuiu apenas 5 vezes, aparentemente ao alterar o minuto. Aumente para 100.000:

 start 100000 

Resultado:

 min| 00:00:00.013 avg| 25.11083 med| 24f max| 00:00:00.108 q)sum times 00:02:00.532 

Como você pode ver, o serviço mal lida, mas mesmo assim consegue se manter à tona. Essa quantidade de dados (240 milhões de linhas por minuto) é extremamente grande; nesses casos, é habitual executar vários clones (ou mesmo dezenas de clones) do serviço, cada um dos quais processa apenas parte dos caracteres. No entanto, o resultado é impressionante para a linguagem interpretada, focada principalmente no armazenamento de dados.

A questão pode surgir: por que o tempo cresce de maneira não linear com o tamanho de cada atualização? O motivo é que a função de compressão é realmente uma função C que funciona muito mais eficientemente que o updateAgg. Começando com algum tamanho de atualização (cerca de 10.000), o updateAgg atinge seu limite máximo e, em seguida, seu tempo de execução não depende do tamanho da atualização. É devido à etapa preliminar Q que o serviço é capaz de digerir esses volumes de dados. Isso enfatiza a importância de trabalhar com big data para escolher o algoritmo certo. Outro ponto é o armazenamento correto de dados na memória. Se os dados não estivessem armazenados em colunas ou não fossem ordenados por tempo, nos familiarizaríamos com falta de cache TLB - a ausência de um endereço de página de memória no cache de endereço do processador. A localização do endereço leva cerca de 30 vezes mais em caso de falha e, no caso de dados dispersos, pode retardar o serviço várias vezes.

Conclusão


Neste artigo, mostrei que os bancos de dados KDB + e Q são adequados não apenas para armazenar big data e fácil acesso a eles via select, mas também para criar serviços de processamento de dados que podem digerir centenas de milhões de linhas / gigabytes de dados, mesmo em um único processo Q . A própria linguagem Q permite a implementação rápida e eficiente de algoritmos relacionados ao processamento de dados devido à sua natureza vetorial, o intérprete interno do dialeto SQL e um conjunto muito bem-sucedido de funções da biblioteca.

Vou notar que o exposto acima é apenas parte dos recursos do Q, possui outros recursos exclusivos. Por exemplo, um protocolo IPC extremamente simples que apaga a fronteira entre processos Q individuais e permite combinar centenas desses processos em uma única rede, que pode estar localizada em dezenas de servidores em diferentes partes do mundo.

Source: https://habr.com/ru/post/pt470596/


All Articles