Como fazer um gatilho DAG no fluxo de ar usando a API Experimental

Ao preparar nossos programas educacionais, periodicamente encontramos dificuldades em termos de trabalho com algumas ferramentas. E nesse momento em que os encontramos, nem sempre há documentação e artigos suficientes que ajudariam a lidar com esse problema.


Foi o caso, por exemplo, em 2015 e usamos o cluster Hadoop com Spark para 35 usuários simultâneos no programa Big Data Specialist. Como cozinhá-lo nesse caso de usuário usando o YARN não estava claro. Como resultado, tendo descoberto e percorrido o caminho por conta própria, eles fizeram um post sobre Habré e também se apresentaram no Moscow Spark Meetup .


Antecedentes


Desta vez, falaremos sobre outro programa - Data Engineer . Nossos participantes constroem dois tipos de arquitetura: lambda e kappa. E na arquitetura lamdba, como parte do processamento em lote, o Airflow é usado para transferir logs do HDFS para o ClickHouse.


Tudo é bom em geral. Deixe-os construir seus gasodutos. No entanto, existe um "mas": todos os nossos programas são tecnológicos em termos do próprio processo de aprendizagem. Para verificar o laboratório, usamos verificadores automáticos: o participante precisa acessar sua conta pessoal, clicar no botão "Verificar" e, depois de algum tempo, vê algum tipo de feedback sobre o que fez. E é neste momento que começamos a abordar o nosso problema.


A verificação deste laboratório está organizada da seguinte forma: enviamos um pacote de dados de controle para Kafka do participante, Gobblin transfere o pacote de dados para o HDFS, depois o Airflow pega esse pacote de dados e o coloca no ClickHouse. O truque é que o Airflow não deve fazer isso em tempo real, mas sim de acordo com o cronograma: a cada 15 minutos, ele pega um monte de arquivos e o lança.


Acontece que precisamos de alguma forma acionar o DAG deles por conta própria, a pedido do verificador aqui e agora. Pesquisando no Google, descobrimos que para as versões posteriores do Airflow existe a chamada API Experimental . A palavra experimental , é claro, parece assustadora, mas o que fazer ... De repente, ela voa.


A seguir, descrevemos o caminho todo: da instalação do Airflow à geração de uma solicitação POST que aciona um DAG usando a API Experimental. Vamos trabalhar com o Ubuntu 16.04.


1. Instalando o fluxo de ar


Vamos verificar se temos Python 3 e virtualenv.


 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0 

Se algo estiver faltando, instale.


Agora crie um diretório no qual continuaremos trabalhando com o Airflow.


 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $ 

Instale o fluxo de ar:


 (venv) $ pip install airflow 

A versão em que trabalhamos: 1.10.


Agora precisamos criar o diretório airflow_home onde os arquivos DAG e os plug-ins do Airflow estarão localizados. Após criar o diretório, configure a variável de ambiente AIRFLOW_HOME .


 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home> 

A próxima etapa é executar o comando que criará e inicializará o banco de dados de fluxo de dados no SQLite:


 (venv) $ airflow initdb 

O banco de dados será criado no airflow.db por padrão.


Verifique se o Airflow está instalado:


 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 

Se o comando funcionou, o Airflow criou seu arquivo de configuração AIRFLOW_HOME no AIRFLOW_HOME :


 $ tree . ├── airflow.cfg └── unittests.cfg 

O fluxo de ar tem uma interface da web. Pode ser iniciado executando o comando:


 (venv) $ airflow webserver --port 8081 

Agora você pode acessar a interface da web em um navegador na porta 8081 no host em que o Airflow foi iniciado, por exemplo: <hostname:8081> .


2. Trabalhando com a API Experimental


Nisso, o Airflow está configurado e pronto para funcionar. No entanto, precisamos executar a API Experimental também. Nossos verificadores são escritos em Python; portanto, todos os pedidos estarão nele usando a biblioteca de requests .


De fato, a API já está trabalhando para consultas simples. Por exemplo, essa solicitação permite testar sua operação:


 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK' 

Se você recebeu essa mensagem em resposta, isso significa que tudo funciona.


No entanto, quando queremos ativar o DAG, enfrentaremos o fato de que esse tipo de solicitação não pode ser feita sem autenticação.


Para fazer isso, você precisará executar várias ações.


Primeiramente, você precisa adicionar isso à configuração:


 [api] auth_backend = airflow.contrib.auth.backends.password_auth 

Em seguida, você precisa criar seu usuário com direitos de administrador:


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Em seguida, você precisa criar um usuário com direitos normais, que poderá fazer um gatilho do DAG.


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Agora está tudo pronto.


3. Iniciando uma Solicitação POST


A solicitação POST em si será assim:


 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n' 

Pedido processado com sucesso.


Dessa forma, damos algum tempo ao DAG para processamento e fazemos uma solicitação para a tabela ClickHouse, tentando capturar um pacote de controle de dados.


Verificação concluída.

Source: https://habr.com/ru/post/pt445852/


All Articles