Análise do produto ClickHouse



Ao desenvolver qualquer produto, seja um serviço de vídeo ou uma fita, histórias ou artigos, desejo medir a "felicidade" condicional do usuário. Para entender se estamos fazendo nossas alterações melhores ou piores, para ajustar a direção do desenvolvimento do produto, confiando não na intuição e em nossos próprios sentimentos, mas nas métricas e números em que você pode acreditar.

Neste artigo, mostrarei como conseguimos lançar estatísticas e análises de produtos em um serviço com um público de 97 milhões de meses, enquanto recebíamos consultas analíticas de desempenho extremamente alto. Falaremos sobre o ClickHouse, os mecanismos usados ​​e os recursos das consultas. Descreverei uma abordagem para agregação de dados, que nos permite obter métricas complexas em uma fração de segundo e falar sobre conversão e teste de dados.

Agora, temos cerca de 6 bilhões de eventos alimentares por dia; em um futuro próximo, chegaremos a 20-25 bilhões. E então - não em um ritmo tão rápido, subiremos para 40-50 bilhões até o final do ano, quando descrevermos todos os eventos alimentares que nos interessam.

1 linhas em conjunto. Decorrido: 0,287 seg. Processado 59,85 bilhões de linhas, 59,85 GB (208,16 bilhões de linhas / s., 208,16 GB / s.)

Detalhes sob o corte.

Prefácio


As ferramentas analíticas foram VKontakte antes. Usuários únicos foram considerados, foi possível criar agendas de eventos por fatias e, assim, cair nas profundezas do serviço. No entanto, tratava-se de fatias fixas antecipadamente, de dados agregados, de HLL para os únicos, de alguma rigidez e incapacidade de responder rapidamente a perguntas um pouco mais complicadas do que "quanto?"

É claro que havia, existe e será hadoop, também foi escrito, escrito e será escrito muito, muitos logs de uso de serviços. Infelizmente, o hdfs foi usado apenas por algumas equipes para implementar suas próprias tarefas. Mais triste ainda, o hdfs não se refere a consultas analíticas rápidas: havia perguntas para muitos campos, cujas respostas precisavam ser encontradas no código e não na documentação acessível a todos.

Chegamos à conclusão de que não é mais possível viver assim. Cada equipe deve ter dados, as consultas sobre eles devem ser rápidas e os dados em si devem ser precisos e ricos em parâmetros úteis.

Portanto, formulamos requisitos claros para o novo sistema de estatística / análise:

  • as consultas analíticas devem ser rápidas;
  • os dados são bastante precisos; idealmente, esses são eventos brutos de interação do usuário com o serviço;
  • a estrutura dos eventos deve ser descrita, compreendida e acessível;
  • armazenamento de dados confiável, garantia de entrega única;
  • é possível contar os únicos, o público (diário, semanal, mensal), métricas de retenção, tempo gasto pelo usuário no serviço, ações quantificadas em métricas únicas e outras pelo conjunto de fatias;
  • testes, conversão e visualização de dados estão em andamento.

Na cozinha


A experiência sugeriu que precisávamos de dois bancos de dados: um lento, onde agregaríamos e enriqueceríamos os dados, e um rápido, onde poderíamos trabalhar com esses dados e criar gráficos sobre eles. Essa é uma das abordagens mais comuns, nas quais, em um banco de dados lento, por exemplo, em hdfs, são projetadas diferentes projeções - únicas e no número de eventos por fatias por um determinado período de tempo.

Em um dia quente de setembro, enquanto conversávamos sobre uma xícara de chá na cozinha com vista para a Catedral de Kazan, tivemos a ideia de experimentar o ClickHouse como uma base rápida - naquela época, já o usamos para armazenar registros técnicos. Havia muitas dúvidas relacionadas principalmente à velocidade e à confiabilidade: os testes de desempenho declarados pareciam irreais e as novas versões do banco de dados interrompiam periodicamente a funcionalidade existente. Portanto, a proposta era simples - para tentar.

Primeiras amostras


Implementamos um cluster de duas máquinas com esta configuração:
2xE5-2620 v4 (32 núcleos no total), RAM de 256G, locais de 28T (raid10 com ext4).

Inicialmente, estava perto do layout, mas depois mudamos para longe. O ClickHouse possui muitos mecanismos de tabela diferentes, mas os principais são da família MergeTree. Escolhemos ReplicatedReplacingMergeTree com aproximadamente as seguintes configurações:

PARTITION BY dt ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id) SAMPLE BY cityHash64(user_id) SETTINGS index_granularity = 8192; 

Replicado - significa que a tabela é replicada e isso resolve um dos nossos requisitos de confiabilidade.

Substituindo - a tabela suporta desduplicação pela chave primária: por padrão, a chave primária corresponde à chave de classificação; portanto, a seção ORDER BY apenas informa qual é a chave primária.

AMOSTRA POR - Eu também queria experimentar a amostragem: a amostra retorna uma amostra uniformemente pseudo-aleatória.

index_granularity = 8192 é o número mágico de linhas de dados entre serifs de índice (sim, é escasso), que é usado por padrão. Nós não mudamos isso.

O particionamento foi realizado por dia (embora por padrão - por mês). Muitos pedidos de dados deveriam ser intradiários - por exemplo, crie um gráfico minucioso de visualizações de vídeo para um determinado dia.

Em seguida, pegamos um pedaço de registros técnicos e enchemos a mesa com cerca de um bilhão de linhas. Excelente compactação, agrupando por tipo de coluna Int *, contando valores únicos - tudo funcionou incrivelmente rápido!

Falando em velocidade, quero dizer que nem uma única solicitação durou mais de 500 ms e a maioria se encaixava em 50 a 100 ms. E isso ocorre em duas máquinas - e, de fato, apenas uma estava envolvida nos cálculos.

Analisamos tudo isso e imaginamos que, em vez da coluna UInt8, haverá um ID do país e a coluna Int8 será substituída por dados, por exemplo, sobre a idade do usuário. E eles perceberam que o ClickHouse é completamente adequado para nós, se tudo for feito corretamente.

Digitação de dados forte


O benefício do ClickHouse começa exatamente quando o esquema de dados correto é formado. Exemplo: plataforma String - ruim, plataforma Int8 + dicionário - bom, LowCardinality (String) - conveniente e bom (falarei sobre LowCardinality um pouco mais tarde).

Criamos uma classe de gerador especial em php, que, mediante solicitação, cria classes de wrapper sobre eventos com base em tabelas no ClickHouse e um único ponto de entrada para o log. Vou explicar o exemplo do esquema que acabou:

  1. O analista / engenheiro de dados / desenvolvedor descreve a documentação: quais campos, possíveis valores e eventos precisam ser registrados.
  2. Uma tabela é criada no ClickHouse de acordo com a estrutura de dados do parágrafo anterior.
  3. As classes de quebra automática para eventos baseados em uma tabela são geradas.
  4. A equipe do produto implementa o preenchimento dos campos de um objeto desta classe, enviando.

Alterar o esquema no nível php e o tipo de dados registrados não funcionará sem primeiro alterar a tabela no ClickHouse. E isso, por sua vez, não pode ser feito sem coordenação com a equipe, alterações na documentação e descrição dos eventos.

Para cada evento, você pode definir duas configurações que controlam a porcentagem de eventos enviados ao ClickHouse e ao hadoop, respectivamente. As configurações são necessárias principalmente para rolagem gradual, com a capacidade de reduzir o registro, se algo der errado. Antes do hadoop, os dados são entregues de maneira padrão usando o Kafka. E no ClickHouse, eles passam por um esquema com o KittenHouse no modo persistente, o que garante pelo menos uma entrega de evento único.

O evento é entregue à tabela de buffer no shard desejado, com base no restante da divisão de algum hash de user_id pelo número de shards no cluster. Em seguida, a tabela de buffer libera os dados para o ReplicatedReplacingMergeTree local. E, além das tabelas locais, uma tabela distribuída é puxada com o mecanismo Distributed, que permite acessar dados de todos os shards.

Desnormalização


ClickHouse é um DBMS colunar. Não se trata de formulários normais, o que significa que é melhor ter todas as informações corretas no evento do que participar. Também há Join, mas se a tabela correta não couber na memória, a dor começa. Portanto, tomamos uma decisão obstinada: todas as informações em que estamos interessados ​​devem ser armazenadas no próprio evento. Por exemplo, sexo, idade do usuário, país, cidade, data de nascimento - todas essas informações públicas podem ser úteis para análise de público-alvo, bem como todas as informações úteis sobre o objeto de interação. Se, por exemplo, estamos falando de vídeo, é video_id, video_owner_id, data de upload do vídeo, duração, qualidade no momento do evento, qualidade máxima e assim por diante.

No total, em cada tabela, temos de 50 a 200 colunas, enquanto em todas as tabelas existem campos de serviço. Por exemplo, o log de erros é error_log - na verdade, chamamos um erro fora do intervalo do tipo. Caso valores estranhos ultrapassem o tamanho do tipo no campo com a idade.

Tipo Baixa Cardinalidade (T)


ClickHouse tem a capacidade de usar dicionários externos. Eles são armazenados na memória, atualizados periodicamente, podem ser efetivamente usados ​​em vários cenários, inclusive como livros de referência clássicos. Por exemplo, você deseja registrar o sistema operacional e tem duas alternativas: uma sequência ou um número + um diretório. Obviamente, em grandes quantidades de dados e para consultas analíticas de alto desempenho, é lógico escrever um número e obter uma representação de string do dicionário quando você precisar:

 dictGetString('os', 'os_name', toUInt64(os_id)) 

Mas há uma maneira muito mais conveniente - usar o tipo LowCardinality (String), que cria automaticamente um dicionário. O desempenho com LowCardinality sob a condição de baixa cardinalidade do conjunto de valores é radicalmente mais alto que com String.

Por exemplo, usamos LowCardinality (String) para os tipos de evento 'play', 'pause', 'rewind'. Ou para a plataforma: 'web', 'android', 'iphone':

 SELECT vk_platform, count() FROM t WHERE dt = yesterday() GROUP BY vk_platform Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB (13.65 billion rows/s., 41.04 GB/s.) 

O recurso ainda é experimental, portanto, para usá-lo, você deve executar:

 SET allow_experimental_low_cardinality_type = 1; 

Mas há um sentimento de que, depois de algum tempo, ela não estará mais no cenário.

Agregação de dados VKontakte


Como existem muitas colunas e muitos eventos, o desejo natural é cortar as partições “antigas”, mas primeiro - montar as unidades. Ocasionalmente, é necessário analisar eventos brutos (um mês ou um ano atrás), para não cortar os dados em hdfs - qualquer analista pode entrar em contato com o parquet desejado para qualquer data.

Como regra, ao agregar em um intervalo de tempo, sempre nos baseamos no fato de que o número de linhas por unidade de tempo é igual ao produto da potência de corte. Isso impõe restrições: os países começam a se reunir em grupos como 'Rússia', 'Ásia', 'Europa', 'O resto do mundo' e idades - em intervalos, para reduzir a dimensão a um milhão de linhas condicionais por data.

Agregação por dt, user_id


Mas temos um ClickHouse reativo! Podemos acelerar para 50 a 100 milhões de linhas em uma data?
Testes rápidos mostraram que podemos e, nesse momento, surgiu uma idéia simples - deixar o usuário na máquina. Nomeadamente, agregar não por "data, fatias" usando ferramentas spark, mas por "data, usuário" significa ClickHouse, enquanto realiza alguma "transposição" de dados.

Com essa abordagem, armazenamos os usuários em dados agregados, o que significa que ainda podemos considerar indicadores de público, métricas de retenção e frequência. Podemos conectar unidades, contando o público comum de vários serviços até todo o público VKontakte. Tudo isso pode ser feito por qualquer fatia presente na tabela pelo mesmo tempo, condicionalmente.

Ilustrarei com um exemplo:



Após a agregação (muitas mais colunas à direita):



Nesse caso, a agregação ocorre precisamente por (dt, user_id). Para campos com informações do usuário, com essa agregação, você pode usar as funções any, anyHeavy (seleciona um valor que ocorre com frequência). Você pode, por exemplo, coletar qualquer Heavy (plataforma) em um agregado para saber qual plataforma o usuário está usando na maior parte dos eventos de vídeo. Se desejar, você pode usar groupUniqArray (plataforma) e armazenar uma matriz de todas as plataformas das quais o usuário gerou o evento. Se isso não for suficiente, você poderá criar colunas separadas para a plataforma e armazenar, por exemplo, o número de vídeos exclusivos exibidos pela metade a partir de uma plataforma específica:

 uniqCombinedIf(cityHash64(video_owner_id, video_id), (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android 

Com essa abordagem, é obtido um agregado bastante amplo, no qual cada linha é um usuário único e cada coluna contém informações sobre o usuário ou sobre sua interação com o serviço.

Acontece que, para calcular a DAU de um serviço, basta executar essa solicitação além de seu agregado:

 SELECT dt, count() as DAU FROM agg GROUP BY dt Elapsed: 0.078 sec. 

Ou calcule quantos dias os usuários estavam no serviço da semana:

 SELECT days_in_service, count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 2.922 sec. 

Podemos acelerar por amostragem, enquanto quase sem perder a precisão:

 SELECT days_in_service, 10 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 10 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 0.454 sec. 

Deve-se notar imediatamente que a amostragem não é pela porcentagem de eventos, mas pela porcentagem de usuários - e, como resultado, ela se torna uma ferramenta incrivelmente poderosa.

Ou o mesmo por 4 semanas com amostragem de 1/100 - são obtidos resultados cerca de 1% menos precisos.

 SELECT days_in_service, 100 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 100 WHERE dt > (yesterday() - 28) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 28 rows in set. Elapsed: 0.287 sec. 

Agregação, por outro lado


Ao agregar por (dt, user_id), não perdemos o usuário, não perdemos informações sobre a interação dele com o serviço, mas, é claro, perdemos as métricas sobre um objeto de interação específico. Mas você também não pode perder isso - vamos construir a unidade
(dt, video_owner_id, video_id), aderindo às mesmas ideias. Mantemos as informações sobre o vídeo o máximo possível, não perdemos os dados sobre a interação do vídeo com o usuário e perdemos completamente as informações sobre o usuário específico.

 SELECT starts FROM agg3 WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...) 1 rows in set. Elapsed: 0.030 sec 

Ou as 10 principais visualizações de vídeo ontem:

 SELECT video_id, video_owner_id, watches FROM video_agg_video_d1 WHERE dt = yesterday() ORDER BY watches DESC LIMIT 10 10 rows in set. Elapsed: 0.035 sec. 

Como resultado, temos um esquema de agregados no formato:

  • agregação por "data, usuário" no produto;
  • agregação por "data, objeto de interação" no produto;
  • outras vezes surgem outras projeções.

Azkaban e TeamCity


Finalmente, algumas palavras sobre a infraestrutura. Nossa coleção agregada começa à noite, começando com OPTIMIZE em cada uma das tabelas com dados brutos para acionar uma mesclagem extraordinária de dados no ReplicatedReplacingMergeTree. A operação pode durar o suficiente, no entanto, é necessário remover as tomadas, se elas ocorrerem. Vale a pena notar que até agora nunca encontrei duplicatas, mas não há garantias de que elas não aparecerão no futuro.

O próximo passo é a criação de agregados. Estes são scripts bash nos quais ocorre o seguinte:

  • primeiro obtemos o número de shards e alguns hosts do shard:

     SELECT shard_num, any(host_name) AS host FROM system.clusters GROUP BY shard_num 
  • o script executa sequencialmente para cada shard (clickhouse-client -h $ host) uma solicitação do formulário (para agregados de usuários):

     INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num 

Isso não é totalmente ideal e pode gerar muita interação de rede entre hosts. No entanto, ao adicionar novos shards, tudo continua funcionando, a localidade dos dados para as unidades é mantida, por isso decidimos não nos preocupar muito com isso.

Temos Azkaban como agendador de tarefas. Eu não diria que essa é uma ferramenta super conveniente, mas lida com sua tarefa perfeitamente, inclusive quando se trata de construir pipelines um pouco mais complexos e quando um script precisa esperar que vários outros sejam concluídos.

O tempo total gasto na conversão dos eventos agora existentes em agregados é de 15 minutos.

Teste


Todas as manhãs realizamos testes automatizados que respondem a perguntas sobre dados brutos, bem como à prontidão e qualidade dos agregados: “Verifique se ontem houve menos de meio por cento menos dados ou dados exclusivos sobre dados brutos ou agregados comparado ao mesmo dia da semana passada. "

Tecnologicamente, esses são testes de unidade comuns usando JUnit e implementando o driver jdbc para ClickHouse. A execução de todos os testes é iniciada no TeamCity e leva cerca de 30 segundos em um thread. Em caso de falhas, recebemos notificações do VKontakte do nosso maravilhoso bot do TeamCity.

Conclusão


Use apenas versões estáveis ​​do ClickHouse e seu cabelo ficará macio e sedoso. Vale acrescentar que o ClickHouse não fica mais lento .

Source: https://habr.com/ru/post/pt445284/


All Articles