Uma tradução do artigo foi preparada especificamente para os alunos do curso de Engenharia de Dados .
ClickHouse é um banco de dados de coluna de código aberto. Este é um ótimo ambiente em que centenas de analistas podem solicitar rapidamente dados detalhados, mesmo quando dezenas de bilhões de novas entradas estão sendo introduzidas por dia. O custo da infraestrutura para suportar esse sistema pode chegar a 100 mil dólares por ano e, potencialmente, a metade, dependendo do uso. Em algum momento, a instalação do Yandex.Metrica ClickHouse continha 10 trilhões de entradas. Além da Yandex, a ClickHouse também obteve sucesso com a Bloomberg e a Cloudflare.
Dois anos atrás, fiz uma
análise comparativa de bancos de dados usando uma única máquina, e ele se tornou
o software de banco de dados gratuito
mais rápido que eu já vi. Desde então, os desenvolvedores não pararam de adicionar recursos, incluindo suporte para compactação Kafka, HDFS e ZStandard. No ano passado, eles adicionaram suporte para métodos de compactação em cascata e
a codificação
delta-delta tornou
- se possível. Ao compactar dados de séries temporais, os valores dos medidores podem ser bem compactados usando a codificação delta, mas será melhor usar a codificação delta-delta para contadores. Uma boa compactação se tornou a chave para o desempenho do ClickHouse.
O ClickHouse consiste em 170 mil linhas de código C ++, com exceção de bibliotecas de terceiros, e é um dos menores bancos de dados de código para bancos de dados distribuídos. Para comparação, o SQLite não suporta distribuição e consiste em 235 mil linhas de código na linguagem C. No momento em que este artigo foi escrito, 207 engenheiros haviam contribuído para o ClickHouse, e a intensidade das confirmações aumentou recentemente.
Em março de 2017, a ClickHouse iniciou um
log de alterações como uma maneira fácil de acompanhar o desenvolvimento. Eles também dividem o arquivo de documentação monolítica em uma hierarquia de arquivos baseada em Markdown. Problemas e recursos são rastreados pelo GitHub e, em geral, esse software se tornou muito mais acessível nos últimos anos.
Neste artigo, examinarei o desempenho do cluster ClickHouse no AWS EC2 usando processadores de 36 núcleos e uma unidade NVMe.
ATUALIZAÇÃO: Uma semana após a publicação inicial deste post, refiz o teste com uma configuração aprimorada e obtive resultados muito melhores. Esta postagem foi atualizada para refletir essas alterações.
Iniciando um cluster do AWS EC2
Usarei três instâncias do c5d.9xlarge EC2 para esta postagem. Cada um deles contém 36 CPUs virtuais, 72 GB de RAM, 900 GB de NVMe SSD e suporta uma rede de 10 gigabit. Eles custam US $ 1.962 / hora cada na região eu-oeste-1 quando lançados sob demanda. Vou usar o Ubuntu Server 16.04 LTS como sistema operacional.
O firewall está configurado para que cada máquina possa se comunicar sem restrições e apenas meu endereço IPv4 está na lista de permissões no cluster SSH.
NVMe pronto para usar
Para que o ClickHouse funcione, criarei um sistema de arquivos EXT4 em cada servidor na unidade NVMe.
$ sudo mkfs -t ext4 /dev/nvme1n1 $ sudo mkdir /ch $ sudo mount /dev/nvme1n1 /ch
Depois que tudo estiver configurado, você poderá ver o ponto de montagem e 783 GB de espaço disponível em cada um dos sistemas.
$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT loop0 7:0 0 87.9M 1 loop /snap/core/5742 loop1 7:1 0 16.5M 1 loop /snap/amazon-ssm-agent/784 nvme0n1 259:1 0 8G 0 disk └─nvme0n1p1 259:2 0 8G 0 part / nvme1n1 259:0 0 838.2G 0 disk /ch
$ df -h
Filesystem Size Used Avail Use% Mounted on udev 35G 0 35G 0% /dev tmpfs 6.9G 8.8M 6.9G 1% /run /dev/nvme0n1p1 7.7G 967M 6.8G 13% / tmpfs 35G 0 35G 0% /dev/shm tmpfs 5.0M 0 5.0M 0% /run/lock tmpfs 35G 0 35G 0% /sys/fs/cgroup /dev/loop0 88M 88M 0 100% /snap/core/5742 /dev/loop1 17M 17M 0 100% /snap/amazon-ssm-agent/784 tmpfs 6.9G 0 6.9G 0% /run/user/1000 /dev/nvme1n1 825G 73M 783G 1% /ch
O conjunto de dados que usarei neste teste é um despejo de dados que eu gerei de 1,1 bilhão de viagens de táxi feitas em Nova York em seis anos. O blog
Billion Taxi no Redshift detalha como eu reuni esse conjunto de dados. Eles são armazenados no AWS S3, portanto, configurarei a interface da linha de comando da AWS usando minhas chaves de acesso e privadas.
$ sudo apt update $ sudo apt install awscli $ aws configure
Definirei o limite do número de solicitações simultâneas de cliente como 100, para que os arquivos sejam carregados mais rapidamente do que com as configurações padrão.
$ aws configure set \ default.s3.max_concurrent_requests \ 100
Vou baixar o conjunto de dados de corrida de táxi do AWS S3 e salvá-lo na unidade NVMe no primeiro servidor. Esse conjunto de dados é de ~ 104 GB no formato CSV compactado com GZIP.
$ sudo mkdir -p /ch/csv $ sudo chown -R ubuntu /ch/csv $ aws s3 sync s3://<bucket>/csv /ch/csv
Instale o ClickHouse
Vou instalar a distribuição do OpenJDK para Java 8, pois é necessário executar o Apache ZooKeeper, necessário para a instalação distribuída do ClickHouse nas três máquinas.
$ sudo apt update $ sudo apt install \ openjdk-8-jre \ openjdk-8-jdk-headless
Em seguida, defino a variável de ambiente
JAVA_HOME
.
$ sudo vi /etc/profile export JAVA_HOME=/usr $ source /etc/profile
Depois, usarei o sistema de gerenciamento de pacotes no Ubuntu para instalar o ClickHouse 18.16.1, glances e o ZooKeeper nas três máquinas.
$ sudo apt-key adv \
$ sudo apt install \ clickhouse-client \ clickhouse-server \ glances \ zookeeperd
Vou criar um diretório para ClickHouse e também fazer algumas substituições de configuração nos três servidores.
$ sudo mkdir /ch/clickhouse $ sudo chown -R clickhouse /ch/clickhouse $ sudo mkdir -p /etc/clickhouse-server/conf.d $ sudo vi /etc/clickhouse-server/conf.d/taxis.conf
Estas são as substituições de configuração que eu usarei.
<?xml version="1.0"?> <yandex> <listen_host>0.0.0.0</listen_host> <path>/ch/clickhouse/</path>
<remote_servers> <perftest_3shards> <shard> <replica> <host>172.30.2.192</host> <port>9000</port> </replica> </shard> <shard> <replica> <host>172.30.2.162</host> <port>9000</port> </replica> </shard> <shard> <replica> <host>172.30.2.36</host> <port>9000</port> </replica> </shard> </perftest_3shards> </remote_servers>
<zookeeper-servers> <node> <host>172.30.2.192</host> <port>2181</port> </node> <node> <host>172.30.2.162</host> <port>2181</port> </node> <node> <host>172.30.2.36</host> <port>2181</port> </node> </zookeeper-servers>
<macros> <shard>03</shard> <replica>01</replica> </macros> </yandex>
Depois, executarei o ZooKeeper e o servidor ClickHouse nas três máquinas.
$ sudo /etc/init.d/zookeeper start $ sudo service clickhouse-server start
Carregando dados no ClickHouse
No primeiro servidor, vou criar uma tabela de viagens, que armazenará um conjunto de dados de viagens de táxi usando o mecanismo de log.
$ clickhouse-client
Depois, descompacto e carrego cada um dos arquivos CSV em uma tabela de viagens. O seguinte é feito em 55 minutos e 10 segundos. Após esta operação, o tamanho do diretório de dados era 134 GB.
$ time (for FILENAME in /ch/csv/trips_x*.csv.gz; do echo $FILENAME gunzip -c $FILENAME | \ clickhouse-client \
A velocidade de importação foi de 155 MB de conteúdo CSV descompactado por segundo. Eu suspeito que isso ocorreu devido a um gargalo na descompressão do GZIP. Pode ter sido mais rápido descompactar todos os arquivos gzip em paralelo usando xargs e carregar os dados descompactados. A seguir, é apresentada uma descrição do que foi relatado durante o processo de importação de CSV.
$ sudo glances
ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 0:11:42 CPU 8.2% nice: 0.0% LOAD 36-core MEM 9.8% active: 5.20G SWAP 0.0% user: 6.0% irq: 0.0% 1 min: 2.24 total: 68.7G inactive: 61.0G total: 0 system: 0.9% iowait: 1.3% 5 min: 1.83 used: 6.71G buffers: 66.4M used: 0 idle: 91.8% steal: 0.0% 15 min: 1.01 free: 62.0G cached: 61.6G free: 0 NETWORK Rx/s Tx/s TASKS 370 (507 thr), 2 run, 368 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 136b 2Kb lo 343Mb 343Mb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 100.4 1.5 1.65G 1.06G 9909 ubuntu 0 S 1:01.33 0 0 clickhouse-client
Vou liberar espaço na unidade NVMe excluindo os arquivos CSV de origem antes de continuar.
$ sudo rm -fr /ch/csv
Converter em formulário de coluna
O mecanismo Log ClickHouse armazenará dados em um formato orientado a linhas. Para solicitar dados mais rapidamente, eu os converto para o formato de coluna usando o mecanismo MergeTree.
$ clickhouse-client
O seguinte é feito em 34 minutos e 50 segundos. Após esta operação, o tamanho do diretório de dados era 237 GB.
CREATE TABLE trips_mergetree ENGINE = MergeTree(pickup_date, pickup_datetime, 8192) AS SELECT trip_id, CAST(vendor_id AS Enum8('1' = 1, '2' = 2, 'CMT' = 3, 'VTS' = 4, 'DDS' = 5, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14)) AS vendor_id, toDate(pickup_datetime) AS pickup_date, ifNull(pickup_datetime, toDateTime(0)) AS pickup_datetime, toDate(dropoff_datetime) AS dropoff_date, ifNull(dropoff_datetime, toDateTime(0)) AS dropoff_datetime, assumeNotNull(store_and_fwd_flag) AS store_and_fwd_flag, assumeNotNull(rate_code_id) AS rate_code_id, assumeNotNull(pickup_longitude) AS pickup_longitude, assumeNotNull(pickup_latitude) AS pickup_latitude, assumeNotNull(dropoff_longitude) AS dropoff_longitude, assumeNotNull(dropoff_latitude) AS dropoff_latitude, assumeNotNull(passenger_count) AS passenger_count, assumeNotNull(trip_distance) AS trip_distance, assumeNotNull(fare_amount) AS fare_amount, assumeNotNull(extra) AS extra, assumeNotNull(mta_tax) AS mta_tax, assumeNotNull(tip_amount) AS tip_amount, assumeNotNull(tolls_amount) AS tolls_amount, assumeNotNull(ehail_fee) AS ehail_fee, assumeNotNull(improvement_surcharge) AS improvement_surcharge, assumeNotNull(total_amount) AS total_amount, assumeNotNull(payment_type) AS payment_type_, assumeNotNull(trip_type) AS trip_type, pickup AS pickup, pickup AS dropoff, CAST(assumeNotNull(cab_type) AS Enum8('yellow' = 1, 'green' = 2)) AS cab_type, precipitation AS precipitation, snow_depth AS snow_depth, snowfall AS snowfall, max_temperature AS max_temperature, min_temperature AS min_temperature, average_wind_speed AS average_wind_speed, pickup_nyct2010_gid AS pickup_nyct2010_gid, pickup_ctlabel AS pickup_ctlabel, pickup_borocode AS pickup_borocode, pickup_boroname AS pickup_boroname, pickup_ct2010 AS pickup_ct2010, pickup_boroct2010 AS pickup_boroct2010, pickup_cdeligibil AS pickup_cdeligibil, pickup_ntacode AS pickup_ntacode, pickup_ntaname AS pickup_ntaname, pickup_puma AS pickup_puma, dropoff_nyct2010_gid AS dropoff_nyct2010_gid, dropoff_ctlabel AS dropoff_ctlabel, dropoff_borocode AS dropoff_borocode, dropoff_boroname AS dropoff_boroname, dropoff_ct2010 AS dropoff_ct2010, dropoff_boroct2010 AS dropoff_boroct2010, dropoff_cdeligibil AS dropoff_cdeligibil, dropoff_ntacode AS dropoff_ntacode, dropoff_ntaname AS dropoff_ntaname, dropoff_puma AS dropoff_puma FROM trips;
É assim que a saída do olhar era durante a operação:
ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 1:06:09 CPU 10.3% nice: 0.0% LOAD 36-core MEM 16.1% active: 13.3G SWAP 0.0% user: 7.9% irq: 0.0% 1 min: 1.87 total: 68.7G inactive: 52.8G total: 0 system: 1.6% iowait: 0.8% 5 min: 1.76 used: 11.1G buffers: 71.8M used: 0 idle: 89.7% steal: 0.0% 15 min: 1.95 free: 57.6G cached: 57.2G free: 0 NETWORK Rx/s Tx/s TASKS 367 (523 thr), 1 run, 366 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 1Kb 8Kb lo 2Kb 2Kb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 241.9 12.8 20.7G 8.78G 8091 clickhous 0 S 30:36.73 34M 125M /usr/bin/clickhouse-server
No último teste, várias colunas foram convertidas e recontadas. Descobri que algumas dessas funções não funcionam mais corretamente neste conjunto de dados. Para resolver esse problema, removi as funções inadequadas e baixei os dados sem conversão para tipos mais detalhados.
Distribuição de dados de cluster
Vou distribuir dados pelos três nós do cluster. Para começar, vou criar uma tabela nas três máquinas.
$ clickhouse-client
CREATE TABLE trips_mergetree_third ( trip_id UInt32, vendor_id String, pickup_date Date, pickup_datetime DateTime, dropoff_date Date, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = MergeTree(pickup_date, pickup_datetime, 8192);
Depois, assegurarei que o primeiro servidor possa ver todos os três nós no cluster.
SELECT * FROM system.clusters WHERE cluster = 'perftest_3shards' FORMAT Vertical;
Row 1: ────── cluster: perftest_3shards shard_num: 1 shard_weight: 1 replica_num: 1 host_name: 172.30.2.192 host_address: 172.30.2.192 port: 9000 is_local: 1 user: default default_database:
Row 2: ────── cluster: perftest_3shards shard_num: 2 shard_weight: 1 replica_num: 1 host_name: 172.30.2.162 host_address: 172.30.2.162 port: 9000 is_local: 0 user: default default_database:
Row 3: ────── cluster: perftest_3shards shard_num: 3 shard_weight: 1 replica_num: 1 host_name: 172.30.2.36 host_address: 172.30.2.36 port: 9000 is_local: 0 user: default default_database:
Depois, definirei uma nova tabela no primeiro servidor, que é baseada no
trips_mergetree_third
e usa o mecanismo Distributed.
CREATE TABLE trips_mergetree_x3 AS trips_mergetree_third ENGINE = Distributed(perftest_3shards, default, trips_mergetree_third, rand());
Então, copiarei os dados da tabela com base no MergeTree para todos os três servidores. O seguinte é feito em 34 minutos e 44 segundos.
INSERT INTO trips_mergetree_x3 SELECT * FROM trips_mergetree;
Após a operação acima, dei 15 minutos ao ClickHouse para me afastar da marca do nível máximo de armazenamento. Os diretórios de dados acabaram sendo 264 GB, 34 GB e 33 GB, respectivamente, em cada um dos três servidores.
Avaliação de desempenho do cluster ClickHouse
O que vi a seguir foi o tempo mais rápido que vi ao executar cada consulta várias vezes na tabela
trips_mergetree_x3
.
$ clickhouse-client
O seguinte foi concluído em 2.449 segundos.
SELECT cab_type, count(*) FROM trips_mergetree_x3 GROUP BY cab_type;
O seguinte foi concluído em 0,691 segundos.
SELECT passenger_count, avg(total_amount) FROM trips_mergetree_x3 GROUP BY passenger_count;
O seguinte foi concluído em 0. 582 segundos.
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year;
O seguinte é concluído em 0,983 segundos.
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
Para comparação, realizei as mesmas consultas em uma tabela baseada no MergeTree, localizada exclusivamente no primeiro servidor.
Avaliação de desempenho de um nó ClickHouse
O que vi a seguir foi o tempo mais rápido que vi ao executar cada consulta várias vezes na tabela
trips_mergetree_x3
.
O seguinte é concluído em 0,241 segundos.
SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type;
O seguinte é concluído em 0,826 segundos.
SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count;
O seguinte é concluído em 1.209 segundos.
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year;
O seguinte foi concluído em 1.781 segundos.
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
Reflexões sobre os resultados
É a primeira vez que um banco de dados baseado em processador gratuito consegue superar o banco de dados da GPU em meus testes. Esse banco de dados baseado em GPU passou por duas revisões desde então, mas, no entanto, o desempenho que o ClickHouse mostrou em um nó é muito impressionante.
Ao mesmo tempo, ao executar a Consulta 1 em um mecanismo distribuído, a sobrecarga é uma ordem de magnitude maior. Espero ter perdido alguma coisa em minha pesquisa para este post, porque seria bom ver como o tempo de consulta diminui quando adiciono mais nós ao cluster. No entanto, é notável que, ao executar outras consultas, a produtividade aumentou cerca de 2 vezes.
Seria bom se o ClickHouse evoluísse para que fosse possível separar armazenamento e computação para que eles pudessem escalar de forma independente. O suporte ao HDFS, adicionado no ano passado, pode ser um passo nesse sentido. Quanto à computação, se uma única solicitação puder ser acelerada adicionando mais nós ao cluster, o futuro deste software será muito positivo.
Obrigado por ler esse post. Ofereço serviços de consultoria, arquitetura e desenvolvimento prático para clientes na América do Norte e Europa. Se você quiser discutir como minhas sugestões podem ajudar sua empresa, entre em contato comigo através do
LinkedIn .