Monitorando processos ETL em um pequeno armazém de dados

Muitos usam ferramentas especializadas para criar procedimentos para extrair, transformar e carregar dados em bancos de dados relacionais. O processo das ferramentas de trabalho é registrado, os erros são registrados.

Em caso de erro, o log contém informações de que a ferramenta falhou ao concluir a tarefa e quais módulos (geralmente java) onde eles pararam. Nas últimas linhas, você pode encontrar um erro no banco de dados, por exemplo, uma violação de uma chave de tabela exclusiva.

Para responder à pergunta, qual é o papel das informações de erro de ETL, classifiquei todos os problemas que ocorreram nos últimos dois anos em um repositório bastante grande.

imagem

Características do armazenamento em que a classificação foi realizada:

  • 20 fontes de dados conectadas
  • 10,5 bilhões de linhas são processadas diariamente
  • que são agregados até 50 milhões de linhas,
  • dados processa 140 pacotes em 700 etapas (uma etapa é uma solicitação sql)
  • servidor - banco de dados X5 de 4 nós

Os erros de banco de dados incluem falta de espaço, uma conexão desconectada, uma sessão interrompida etc.

Erros lógicos incluem violações de chaves de tabela, objetos inválidos, falta de acesso a objetos etc.
O agendador pode não iniciar no momento certo, pode travar etc.

Erros simples não requerem muito tempo para serem corrigidos. Com a maioria deles, um bom ETL pode lidar sozinho.

Erros complicados tornam necessário abrir e verificar procedimentos para trabalhar com dados e pesquisar fontes de dados. Muitas vezes, levam à necessidade de testar alterações e implantação.

Portanto, metade de todos os problemas estão relacionados ao banco de dados. 48% de todos os erros são simples.
A terceira parte de todos os problemas está associada a uma alteração na lógica ou no modelo do repositório; mais da metade desses erros são complexos.

E menos de um quarto de todos os problemas estão relacionados ao agendador de tarefas, 18% dos quais são erros simples.

Em geral, 22% de todos os erros que ocorreram são complexos; corrigi-los requer mais atenção e tempo. Eles ocorrem aproximadamente uma vez por semana. Enquanto erros simples acontecem quase todos os dias.

Obviamente, o monitoramento dos processos ETL será efetivo quando o local do erro for indicado com a maior precisão possível no log e for necessário um tempo mínimo para encontrar a origem do problema.

Monitoramento eficaz


O que eu queria ver no processo de monitoramento de ETL?

imagem
Iniciar às - quando iniciado
Fonte - uma fonte de dados
Camada - qual nível de armazenamento está carregando,
O nome do trabalho ETL é um procedimento de carregamento que consiste em várias etapas pequenas,
Número da etapa - o número da etapa a ser executada,
Linhas afetadas - quantos dados já foram processados,
Duração s - quanto tempo leva para executar,
Status - se está tudo bem ou não: OK, ERRO, EXECUÇÃO, PENDURAS
Mensagem - A última mensagem bem-sucedida ou descrição do erro.

Com base no status das entradas, você pode enviar um email. carta aos outros participantes. Se não houver erros, a carta não será necessária.

Assim, no caso de um erro, a localização do incidente é claramente indicada.

Às vezes acontece que a própria ferramenta de monitoramento não funciona. Nesse caso, é possível chamar diretamente uma visão (visão) no banco de dados com base na qual o relatório é construído.

Tabela de Monitoramento ETL


Para implementar o monitoramento de processos ETL, uma tabela e uma visualização são suficientes.

Para fazer isso, você pode retornar ao seu pequeno repositório e criar um protótipo no banco de dados sqlite.

Tabelas DDL
CREATE TABLE UTL_JOB_STATUS ( /* Table for logging of job execution log. Important that the job has the steps ETL_START and ETL_END or ETL_ERROR */ UTL_JOB_STATUS_ID INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, SID INTEGER NOT NULL DEFAULT -1, /* Session Identificator. Unique for every Run of job */ LOG_DT INTEGER NOT NULL DEFAULT 0, /* Date time */ LOG_D INTEGER NOT NULL DEFAULT 0, /* Date */ JOB_NAME TEXT NOT NULL DEFAULT 'N/A', /* Job name like JOB_STG2DM_GEO */ STEP_NAME TEXT NOT NULL DEFAULT 'N/A', /* ETL_START, ... , ETL_END/ETL_ERROR */ STEP_DESCR TEXT, /* Description of task or error message */ UNIQUE (SID, JOB_NAME, STEP_NAME) ); INSERT INTO UTL_JOB_STATUS (UTL_JOB_STATUS_ID) VALUES (-1); 

Envio / relatório de DDL
 CREATE VIEW IF NOT EXISTS UTL_JOB_STATUS_V AS /* Content: Package Execution Log for last 3 Months. */ WITH SRC AS ( SELECT LOG_D, LOG_DT, UTL_JOB_STATUS_ID, SID, CASE WHEN INSTR(JOB_NAME, 'FTP') THEN 'TRANSFER' /* file transfer */ WHEN INSTR(JOB_NAME, 'STG') THEN 'STAGE' /* stage */ WHEN INSTR(JOB_NAME, 'CLS') THEN 'CLEANSING' /* cleansing */ WHEN INSTR(JOB_NAME, 'DIM') THEN 'DIMENSION' /* dimension */ WHEN INSTR(JOB_NAME, 'FCT') THEN 'FACT' /* fact */ WHEN INSTR(JOB_NAME, 'ETL') THEN 'STAGE-MART' /* data mart */ WHEN INSTR(JOB_NAME, 'RPT') THEN 'REPORT' /* report */ ELSE 'N/A' END AS LAYER, CASE WHEN INSTR(JOB_NAME, 'ACCESS') THEN 'ACCESS LOG' /* source */ WHEN INSTR(JOB_NAME, 'MASTER') THEN 'MASTER DATA' /* source */ WHEN INSTR(JOB_NAME, 'AD-HOC') THEN 'AD-HOC' /* source */ ELSE 'N/A' END AS SOURCE, JOB_NAME, STEP_NAME, CASE WHEN STEP_NAME='ETL_START' THEN 1 ELSE 0 END AS START_FLAG, CASE WHEN STEP_NAME='ETL_END' THEN 1 ELSE 0 END AS END_FLAG, CASE WHEN STEP_NAME='ETL_ERROR' THEN 1 ELSE 0 END AS ERROR_FLAG, STEP_NAME || ' : ' || STEP_DESCR AS STEP_LOG, SUBSTR( SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), 1, INSTR(SUBSTR(STEP_DESCR, INSTR(STEP_DESCR, '***')+4), '***')-2 ) AS AFFECTED_ROWS FROM UTL_JOB_STATUS WHERE datetime(LOG_D, 'unixepoch') >= date('now', 'start of month', '-3 month') ) SELECT JB.SID, JB.MIN_LOG_DT AS START_DT, strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS LOG_DT, JB.SOURCE, JB.LAYER, JB.JOB_NAME, CASE WHEN JB.ERROR_FLAG = 1 THEN 'ERROR' WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 AND strftime('%s','now') - JB.MIN_LOG_DT > 0.5*60*60 THEN 'HANGS' /* half an hour */ WHEN JB.ERROR_FLAG = 0 AND JB.END_FLAG = 0 THEN 'RUNNING' ELSE 'OK' END AS STATUS, ERR.STEP_LOG AS STEP_LOG, JB.CNT AS STEP_CNT, JB.AFFECTED_ROWS AS AFFECTED_ROWS, strftime('%d.%m.%Y %H:%M', datetime(JB.MIN_LOG_DT, 'unixepoch')) AS JOB_START_DT, strftime('%d.%m.%Y %H:%M', datetime(JB.MAX_LOG_DT, 'unixepoch')) AS JOB_END_DT, JB.MAX_LOG_DT - JB.MIN_LOG_DT AS JOB_DURATION_SEC FROM ( SELECT SID, SOURCE, LAYER, JOB_NAME, MAX(UTL_JOB_STATUS_ID) AS UTL_JOB_STATUS_ID, MAX(START_FLAG) AS START_FLAG, MAX(END_FLAG) AS END_FLAG, MAX(ERROR_FLAG) AS ERROR_FLAG, MIN(LOG_DT) AS MIN_LOG_DT, MAX(LOG_DT) AS MAX_LOG_DT, SUM(1) AS CNT, SUM(IFNULL(AFFECTED_ROWS, 0)) AS AFFECTED_ROWS FROM SRC GROUP BY SID, SOURCE, LAYER, JOB_NAME ) JB, ( SELECT UTL_JOB_STATUS_ID, SID, JOB_NAME, STEP_LOG FROM SRC WHERE 1 = 1 ) ERR WHERE 1 = 1 AND JB.SID = ERR.SID AND JB.JOB_NAME = ERR.JOB_NAME AND JB.UTL_JOB_STATUS_ID = ERR.UTL_JOB_STATUS_ID ORDER BY JB.MIN_LOG_DT DESC, JB.SID DESC, JB.SOURCE; 

SQL Verificando a capacidade de obter um novo número de sessão
 SELECT SUM ( CASE WHEN start_job.JOB_NAME IS NOT NULL AND end_job.JOB_NAME IS NULL /* existed job finished */ AND NOT ( 'y' = 'n' ) /* force restart PARAMETER */ THEN 1 ELSE 0 END ) AS IS_RUNNING FROM ( SELECT 1 AS dummy FROM UTL_JOB_STATUS WHERE sid = -1) d_job LEFT OUTER JOIN ( SELECT JOB_NAME, SID, 1 AS dummy FROM UTL_JOB_STATUS WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */ AND STEP_NAME = 'ETL_START' GROUP BY JOB_NAME, SID ) start_job /* starts */ ON d_job.dummy = start_job.dummy LEFT OUTER JOIN ( SELECT JOB_NAME, SID FROM UTL_JOB_STATUS WHERE JOB_NAME = 'RPT_ACCESS_LOG' /* job name PARAMETER */ AND STEP_NAME in ('ETL_END', 'ETL_ERROR') /* stop status */ GROUP BY JOB_NAME, SID ) end_job /* ends */ ON start_job.JOB_NAME = end_job.JOB_NAME AND start_job.SID = end_job.SID 

Características da tabela:

  • o início e o fim do procedimento de processamento de dados devem ser seguidos pelas etapas ETL_START e ETL_END
  • em caso de erro, a etapa ETL_ERROR deve ser criada com sua descrição
  • a quantidade de dados processados ​​deve ser destacada, por exemplo, com asteriscos
  • ao mesmo tempo, o mesmo procedimento pode ser iniciado com o parâmetro force_restart = y; sem ele, o número da sessão é emitido apenas para o procedimento concluído
  • no modo normal, você não pode executar o mesmo procedimento de processamento de dados em paralelo

As operações necessárias para trabalhar com a tabela são as seguintes:

  • obtendo o número da sessão do procedimento ETL para iniciar
  • insira uma entrada de log em uma tabela
  • obtendo o último registro de procedimento ETL bem-sucedido

Em bancos de dados como Oracle ou Postgres, essas operações podem ser implementadas com funções internas. O Sqlite precisa de um mecanismo externo e, neste caso, é um protótipo no PHP .

Conclusão


Assim, as mensagens de erro nas ferramentas de processamento de dados desempenham um papel extremamente importante. Mas é difícil chamá-los de ótimos para uma pesquisa rápida pelas causas do problema. Quando o número de procedimentos se aproxima de cem, o monitoramento de processos se transforma em um projeto complexo.

O artigo fornece um exemplo de uma possível solução para o problema na forma de um protótipo. Todo o protótipo do pequeno repositório está disponível no SQLite PHP ETL Utilities do gitlab.

Source: https://habr.com/ru/post/pt465637/


All Articles