Análise de portfólio de investimentos e serviços em nuvem da Amazon

Recentemente, houve alta volatilidade nos mercados de ações, quando, por exemplo, um artigo estável de uma empresa conhecida pode perder vários por cento ao mesmo tempo com notícias de sanções contra sua administração ou vice-versa, subir aos céus em um relatório positivo e expectativas dos investidores sobre dividendos super lucrativos.

Como determinar se a propriedade de um determinado título trouxe renda ou apenas perdas e decepções?

(Fonte)

Neste artigo, mostrarei como identificar e visualizar o resultado financeiro ajustado para títulos.

Usando o exemplo de abertura de relatórios do cliente, o Opening Broker, consideraremos a análise e consolidação dos relatórios de corretagem para o mercado de ações, construindo a arquitetura de um sistema de relatórios em nuvem com posterior análise simples e conveniente no AWS Quicksight.

Descrição da tarefa


Muitos treinamentos e lições educacionais nos dizem sobre a necessidade de um diário do trader, onde todos os parâmetros de transação são registrados para análise posterior e resumo da estratégia de negociação. Concordo que essa abordagem para trabalhar no Exchange permite disciplinar um profissional, aumentar sua consciência, mas também pode cansá-lo de um processo tedioso.

Admito que, a princípio, tentei seguir cuidadosamente o conselho do registro em diário, escrevi meticulosamente cada transação com seus parâmetros em uma tabela do Excel, construí alguns relatórios, gráficos de resumo, planejei transações futuras, mas ... Eu estava rapidamente cansado de tudo.

Por que manter o diário de um comerciante manualmente é inconveniente?
  • o preenchimento manual do diário (mesmo usando automação parcial, na forma de descarregar transações diárias do terminal de negociação) se cansa rapidamente;
  • existe um alto risco de erro ou erro de digitação na entrada manual;
  • pode acontecer que um operador ativo se torne um investidor passivo e volte cada vez menos a esta revista e depois se esqueça completamente dela (meu caso); bem e finalmente
  • podemos programar, por que não tirar proveito disso e automatizar todo o processo? Então vamos lá!

Freqüentemente, as corretoras são organizações de alta tecnologia que fornecem aos clientes análises de alta qualidade em quase todas as questões de interesse. É justo dizer que esses relatórios estão ficando cada vez melhores a cada atualização, mas mesmo os mais avançados podem não ter a personalização e a consolidação que os clientes exigentes e inquisidores desejam ver.

Por exemplo, o Opening Broker permite que você receba relatórios de corretagem no formato XML em sua conta pessoal, mas se você tiver um IIA e uma conta corretagem regular na Bolsa de Valores de Moscou (MOEX), esses serão dois relatórios diferentes e se você tiver outra conta no St. Petersburg Stock Exchange (SPB), os dois primeiros adicionarão mais um.

No total, para obter um diário consolidado do investidor, será necessário processar três arquivos no formato XML.

Os relatórios acima mencionados sobre MOEX e SPB diferem ligeiramente em seus formatos, que precisam ser levados em consideração no processo de implementação do mapeamento de dados.

Arquitetura do sistema em desenvolvimento


O diagrama abaixo mostra o modelo de arquitetura do sistema em desenvolvimento:


Implementação do analisador


Receberemos relatórios nas três contas da Conta Pessoal pelo período máximo possível (podem ser divididos em vários relatórios para cada ano), salvá-los no formato XML e colocá-los em uma pasta. Como dados de teste para o estudo, usaremos uma carteira de clientes fictícia, mas com parâmetros o mais próximo possível das realidades do mercado.


Suponha que o investidor Sr. X em consideração tenha uma pequena carteira de cinco títulos:

  1. O relatório sobre a troca do SPB terá dois documentos: Apple e Microsoft;
  2. O relatório sobre a bolsa MOEX (corretagem) contém um artigo: FGC UES;
  3. O relatório no MOEX Exchange (IIS) contém dois títulos: MMK e OFZ 24019;

De acordo com nossos cinco títulos, pode haver transações de compra / venda, pagamento de dividendos e cupom, o preço pode mudar etc. Queremos ver a situação no momento atual, a saber: o resultado financeiro, levando em consideração todos os pagamentos, transações e valor de mercado atual.

E aqui o Python entra em cena, lemos as informações de todos os relatórios em uma matriz:

my_files_list = [join('Data/', f) for f in listdir('Data/') if isfile(join('Data/', f))] my_xml_data = [] #     for f in my_files_list: tree = ET.parse(f) root = tree.getroot() my_xml_data.append(root) 

Para análises, a partir dos relatórios, precisamos de várias entidades, a saber:

  • Posições de valores mobiliários em uma carteira;
  • Negócios concluídos;
  • Operações de não negociação e outros movimentos de conta;
  • Preços médios das posições em aberto
Para preparar a amostra, usaremos quatro dicionários para descrever os conjuntos acima.

 dict_stocks = {'stock_name': [], 'account': [], 'currency': [], 'current_cost': [], 'current_cost_rub': [], 'saldo' : []} dict_deals = {'stock_name': [], 'account': [], 'date_oper': [], 'type_oper': [], 'quantity': [], 'price': [], 'currency': [], 'brokerage': [], 'result': []} dict_flows = {'stock_name': [], 'account': [], 'date_oper': [], 'type_oper': [], 'result': [], 'currency': []} dict_avg_price = {'stock_name': [], 'account': [], 'avg_open_price' : []} 

Algumas palavras sobre o que esses dicionários são.

Dicionário dict_stocks
O dicionário dict_stocks é necessário para armazenar informações gerais no portfólio:

  • Nome do papel (stock_name);
  • Nome da conta (SPB, MOEX BROK, MOEX IIS) (conta);
  • Moeda usada para liquidações neste documento (moeda);
  • Valor atual (no momento da geração do relatório no Personal Account Opening Broker) (current_cost). Aqui, quero observar que, para clientes com excesso de demanda, é possível fazer melhorias adicionais no futuro e usar o recebimento dinâmico de uma cotação de segurança de um terminal comercial ou do site da bolsa correspondente;
  • O valor atual da posição de segurança no momento em que o relatório foi gerado (current_cost_rub)
    Da mesma forma que o item acima, aqui você também pode obter a taxa do Banco Central no momento atual ou a taxa de câmbio, como desejar.
  • Saldo atual de títulos (saldo)

Dicionário dict_deals
O dicionário dict_deals é necessário para armazenar as seguintes informações sobre transações concluídas:

  • Nome do papel (stock_name);
  • Nome da conta (SPB, MOEX BROK, MOEX IIS) (conta);
  • Data da transação, ou seja, T0 (data_operador);
  • Tipo de operação (type_oper);
  • O volume de títulos que participam da transação (quantidade);
  • O preço pelo qual a transação foi executada (preço);
  • Moeda na qual a transação foi realizada (moeda);
  • Comissão de corretagem para uma transação (corretagem);
  • O resultado financeiro da transação (resultado)

Dicionário Dict_flows
O dicionário dict_flows reflete a movimentação de fundos na conta do cliente e é usado para armazenar as seguintes informações:

  • Nome do papel (stock_name);
  • Nome da conta (SPB, MOEX BROK, MOEX IIS) (conta);
  • Data da transação, ou seja, T0 (data_operador);
  • Tipo de operação (type_oper). Pode levar vários valores: div, NKD, tax;
  • Moeda na qual a transação foi realizada (moeda);
  • O resultado financeiro da operação (resultado)

Dicionário dict_avg_price
O dicionário dict_avg_price é necessário para informações contábeis ao preço médio de compra de cada artigo:

  • Nome do papel (stock_name);
  • Nome da conta (SPB, MOEX BROK, MOEX IIS) (conta);
  • Preço médio de uma posição em aberto (avg_open_price)

Processamos uma matriz de documentos XML e preenchemos esses dicionários com os dados apropriados:

 #       for XMLdata in my_xml_data: #      exchange_name = 'SPB' if XMLdata.get('board_list') == ' ' else 'MOEX' client_code = XMLdata.get('client_code') account_name = get_account_name(exchange_name, client_code) #   current_position, deals, flows, stock_name, \ saldo, ticketdate, price, brokerage, \ operationdate, currency, \ current_cost, current_cost_rub, \ stock_name_deal, payment_currency, currency_flows = get_allias(exchange_name) #      get_briefcase(XMLdata) df_stocks = pd.DataFrame(dict_stocks) df_stocks.set_index("stock_name", drop = False, inplace = True) #    get_deals(XMLdata) df_deals = pd.DataFrame(dict_deals) df_avg = pd.DataFrame(dict_avg_price) #       get_nontrade_operation(XMLdata) df_flows = pd.DataFrame(dict_flows) 

Todo o processamento passa pelo loop sobre todos os dados XML dos relatórios. As informações sobre a plataforma de negociação, o código do cliente, são iguais em todos os relatórios, para que você possa extraí-lo com segurança das mesmas tags sem usar o mapeamento.

Mas temos que usar um design especial que forneça o alias necessário para a tag com base no relatório (SPB ou MOEX), porque dados de natureza idêntica nesses relatórios são chamados de maneira diferente.

Discrepâncias de tag
  • A comissão do corretor de transações no relatório SBP está na etiqueta de corretagem e no relatório MOEX - broker_commission ;
  • A data da transação da conta que não é de negociação no relatório SPB é data da operação e, no MOEX, data da operação etc.

Exemplo de mapeamento de tags
 tags_mapping = { 'SPB': { 'current_position': 'briefcase_position', 'deals': 'closed_deal', 'flows': 'nontrade_money_operation', ... 'stock_name_deal': 'issuername', 'paymentcurrency': 'paymentcurrency', 'currency_flows': 'currencycode' }, 'MOEX': { 'current_position': 'spot_assets', 'deals': 'spot_main_deals_conclusion', 'flows': 'spot_non_trade_money_operations', ... 'stock_name_deal': 'security_name', 'paymentcurrency': 'price_currency_code', 'currency_flows': 'currency_code' } } 

A função get_allias retorna o nome da tag necessária para o processamento, levando o nome da plataforma de negociação como uma entrada:

Função Get_allias
 def get_allias(exchange_name): return( tags_mapping[exchange_name]['current_position'], tags_mapping[exchange_name]['deals'], tags_mapping[exchange_name]['flows'], ... tags_mapping[exchange_name]['stock_name_deal'], tags_mapping[exchange_name]['paymentcurrency'], tags_mapping[exchange_name]['currency_flows'] ) 

A função get_briefcase é responsável pelo processamento de informações sobre o status do portfólio de clientes:

Função Get_briefcase
 def get_briefcase(XMLdata): #         briefcase_position briefcase_position = XMLdata.find(current_position) if not briefcase_position: return try: for child in briefcase_position: stock_name_reduce = child.get(stock_name).upper() stock_name_reduce = re.sub('[,\.]|(\s?INC)|(\s+$)|([-\s]?)', '', stock_name_reduce) dict_stocks['stock_name'].append(stock_name_reduce) dict_stocks['account'].append(account_name) dict_stocks['currency'].append(child.get(currency)) dict_stocks['current_cost'].append(float(child.get(current_cost))) dict_stocks['current_cost_rub'].append(float(child.get(current_cost_rub))) dict_stocks['saldo'].append(float(child.get(saldo))) except Exception as e: print('get_briefcase --> Oops! It seems we have a BUG!', e) 

Em seguida, a função get_deals recupera informações sobre transações:

Função Get_deals
 def get_deals(XMLdata): stock_name_proc = '' closed_deal = XMLdata.find(deals) if not closed_deal: return #   SPB    -    , #    MOEX:  ,      #    : if exchange_name == 'SPB': sortchildrenby(closed_deal, stock_name_deal) for child in closed_deal: sortchildrenby(child, stock_name_deal) try: for child in closed_deal: stock_name_reduce = child.get(stock_name_deal).upper() stock_name_reduce = re.sub('[,\.]|(\s?INC)|(\s+$)|([-\s]?)', '', stock_name_reduce) dict_deals['stock_name'].append(stock_name_reduce) dict_deals['account'].append(account_name) dict_deals['date_oper'].append(to_dt(child.get(ticketdate)).strftime('%Y-%m-%d')) current_cost = get_current_cost(stock_name_reduce) #    SPB     - quantity, #   MOEX  : buy_qnty  sell_qnty if exchange_name == 'MOEX': if child.get('buy_qnty'): quantity = float(child.get('buy_qnty')) else: quantity = - float(child.get('sell_qnty')) else: quantity = float(child.get('quantity')) dict_deals['quantity'].append(quantity) dict_deals['price'].append(float(child.get('price'))) dict_deals['type_oper'].append('deal') dict_deals['currency'].append(child.get(payment_currency)) brok_comm = child.get(brokerage) if brok_comm is None: brok_comm = 0 else: brok_comm = float(brok_comm) dict_deals['brokerage'].append(float(brok_comm)) #         if stock_name_proc != stock_name_reduce: if stock_name_proc != '': put_avr_price_in_df(account_name, stock_name_proc, \ pnl.m_net_position, pnl.m_avg_open_price) current_cost = get_current_cost(stock_name_proc) pnl.update_by_marketdata(current_cost) if len(dict_deals['result']) > 0: if exchange_name != 'SPB': dict_deals['result'][-1] = pnl.m_unrealized_pnl * 0.87 -dict_deals['brokerage'][-2] else: dict_deals['result'][-1] = pnl.m_unrealized_pnl - dict_deals['brokerage'][-2] stock_name_proc = stock_name_reduce pnl = PnlSnapshot(stock_name_proc, float(child.get('price')), quantity) dict_deals['result'].append(-1 * brok_comm) else: pnl.update_by_tradefeed(float(child.get('price')), quantity) #  ,   if quantity < 0: if pnl.m_realized_pnl > 0 and exchange_name != 'SPB': pnl_sum = pnl.m_realized_pnl * 0.87 - brok_comm else: pnl_sum = pnl.m_realized_pnl - brok_comm dict_deals['result'].append(float(pnl_sum)) else: pnl.update_by_marketdata(current_cost) dict_deals['result'].append(-1 * brok_comm) put_avr_price_in_df(account_name, stock_name_proc, \ pnl.m_net_position, pnl.m_avg_open_price) current_cost = get_current_cost(stock_name_proc) pnl.update_by_marketdata(current_cost) if len(dict_deals['result']) > 0: if exchange_name != 'SPB': dict_deals['result'][-1] = pnl.m_unrealized_pnl * 0.87 -dict_deals['brokerage'][-2] else: dict_deals['result'][-1] = pnl.m_unrealized_pnl - dict_deals['brokerage'][-2] except Exception as e: print('get_deals --> Oops! It seems we have a BUG!', e) 

Além de processar uma matriz com informações sobre os parâmetros da transação, o preço médio de uma posição em aberto e realizado pelo PNL usando o método FIFO também é calculado aqui. A classe PnlSnapshot é responsável por esse cálculo, cuja criação com as pequenas modificações em que o código apresentado aqui foi tomado como base: Cálculo do resultado

E, finalmente, a mais difícil de implementar é a função de obter informações sobre operações que não são comerciais - get_nontrade_operation . Sua complexidade está no fato de que, no bloco de relatório usado para operações que não são de negociação, não há informações claras sobre o tipo de transação e a segurança à qual essa operação está vinculada.

Exemplo de destinos de pagamento para operações não comerciais
O pagamento de dividendos ou a receita acumulada de cupons podem ser indicados da seguinte forma:

  1. Pagamento de dividendos do cliente de renda <777777> < APPLE INC-ao> -> pagamento de dividendos do relatório do SPB;
  2. Pagamento de dividendos do cliente de renda <777777> < MICROSOFT COM-®>
  3. Pagamento de renda retida na fonte 777777i (NKD 2 OFZ 24019 ) imposto retido na fonte 0,00 rublos -> pagamento de cupom do relatório MOEX;
  4. Pagamento de renda ao cliente 777777 dividendos do FGC UES - imposto retido na fonte XX.XX rublos -> pagamento de dividendos a partir do relatório MOEX. etc.

Portanto, será difícil ficar sem expressões regulares, portanto, usá-las-emos ao máximo. O outro lado da questão é que o nome da empresa nem sempre coincide com o nome no portfólio ou nas transações para fins de pagamento. Portanto, o nome recebido do emissor da finalidade do pagamento deve ser adicionalmente correlacionado com o dicionário. Como dicionário, usaremos uma variedade de negócios, porque existe a lista mais completa de empresas.

A função get_company_from_str recupera o nome do emissor do comentário:

Função Get_company_from_str
 def get_company_from_str(comment): company_name = '' #    / flows_pattern = [ '^.+\s<(\w+)?.+->$', '^.+\s(.+)-.+$', '^.+\(\s\d?\s(.+)\).+$', '^.+\s(.+)-.+$' ] for pattern in flows_pattern: match = re.search(pattern, comment) if match: return match.group(1).upper() return company_name 

A função get_company_from_briefcase leva o nome da empresa ao dicionário se encontrar uma correspondência entre as empresas que participaram das transações:

Função Get_company_from_briefcase
 def get_company_from_briefcase(company_name): company_name_full = None value_from_dic = df_deals[df_deals['stock_name'].str.contains(company_name)] company_arr = value_from_dic['stock_name'].unique() if len(company_arr) == 1: company_name_full = company_arr[0] return company_name_full 


E, finalmente, a função final de coletar dados sobre operações que não são de negociação é get_nontrade_operation :

Função Get_nontrade_operation
 def get_nontrade_operation(XMLdata): nontrade_money_operation = XMLdata.find(flows) if not nontrade_money_operation: return try: for child in nontrade_money_operation: comment = child.get('comment') type_oper_match = re.search('||^.+.+.+$', comment) if type_oper_match: company_name = get_company_from_str(comment) type_oper = get_type_oper(comment) dict_flows['stock_name'].append(company_name) dict_flows['account'].append(account_name) dict_flows['date_oper'].append(to_dt(child.get(operationdate)).strftime('%Y-%m-%d')) dict_flows['type_oper'].append(type_oper) dict_flows['result'].append(float(child.get('amount'))) dict_flows['currency'].append(child.get(currency_flows)) except Exception as e: print('get_nontrade_operation --> Oops! It seems we have a BUG!', e) 

O resultado da coleta de dados dos relatórios serão três DataFrames, aproximadamente os seguintes:

  1. DataFrame com informações sobre preços médios de posições em aberto:
  2. DataFrame da oferta especial:
  3. DataFrame com informações sobre operações não comerciais:


Então, tudo o que resta fazer é realizar uma união externa da tabela de transações com a tabela de informações do portfólio:

 df_result = pd.merge(df_deals, df_stocks_avg, how='outer', on=['stock_name', 'account', 'currency']).fillna(0) df_result.sample(10) 


E, finalmente, a parte final do processamento da matriz de dados é a mesclagem da matriz de dados obtida na etapa anterior com o DataFrame para transações não comerciais.
O resultado do trabalho realizado é uma grande mesa plana com todas as informações necessárias para análise:

 df_result_full = df_result.append(df_flows, ignore_index=True).fillna(0) df_result_full.sample(10).head() 


O conjunto de dados resultante (Relatório Final) do DataFrame é facilmente carregado no CSV e pode ser usado para análises detalhadas em qualquer sistema de BI.

 if not exists('OUTPUT'): makedirs('OUTPUT') report_name = 'OUTPUT\my_trader_diary.csv' df_result_full.to_csv(report_name, index = False, encoding='utf-8-sig') 


Carregar e processar dados na AWS


O progresso não pára e agora os serviços em nuvem e os modelos de computação sem servidor estão ganhando grande popularidade no processamento e armazenamento de dados. Isso se deve em grande parte à simplicidade e baixo custo dessa abordagem, quando você não precisa comprar equipamentos caros para construir uma arquitetura de sistema para computação complexa ou processar big data, mas apenas aluga a energia na nuvem pelo tempo necessário e implanta os recursos necessários com rapidez suficiente por uma taxa relativamente pequena .

Um dos maiores e mais conhecidos fornecedores de nuvem do mercado é a Amazon. Vejamos o exemplo do ambiente Amazon Web Services (AWS) para criar um sistema analítico para o processamento de dados em nosso portfólio de investimentos.

A AWS possui uma ampla seleção de ferramentas, mas usaremos o seguinte:

  • Amazon S3 - armazenamento de objetos, que permite armazenar quantidades quase ilimitadas de informações;
  • AWS Glue - o serviço ETL em nuvem mais poderoso que pode determinar a estrutura e gerar o código ETL a partir dos dados de origem fornecidos;
  • O Amazon Athena , um serviço de consulta SQL on-line sem servidor, permite analisar rapidamente os dados do S3 sem muita preparação. Ele também tem acesso aos metadados que o AWS Glue prepara, o que permite acessar os dados imediatamente após a transmissão do ETL;
  • Amazon QuickSight - serviço de BI sem servidor, você pode criar qualquer visualização, relatórios analíticos "on the fly" etc.

A documentação da Amazon está correta, em particular, há um bom artigo Práticas recomendadas ao usar o Athena com AWS Glue , que descreve como criar e usar tabelas e dados usando o AWS Glue. Vamos tirar proveito das principais idéias deste artigo e aplicá-las para criar nossa própria arquitetura de um sistema de relatório analítico.

Os arquivos CSV preparados pelo nosso analisador de relatórios serão adicionados ao bloco S3. Está previsto que a pasta correspondente no S3 seja reabastecida todos os sábados - no final da semana de negociação, para que você não possa ficar sem o particionamento de dados até a data de formação e processamento do relatório.
Além de otimizar a operação de consultas SQL para esses dados, essa abordagem nos permitirá realizar análises adicionais, por exemplo, para obter a dinâmica das mudanças nos resultados financeiros de cada artigo, etc.

Trabalhar com o Amazon S3
  • Crie um bucket no S3, chame-o de "analisador de relatórios";
  • Nesse bloco "analisador de relatórios", crie uma pasta chamada "my_trader_diary";
  • No diretório "my_trader_diary", crie um diretório com a data do relatório atual, por exemplo, "date_report = 2018-10-01" e coloque o arquivo CSV nele;
  • Somente para o experimento e uma melhor compreensão do particionamento, criaremos mais dois diretórios: "date_report = 2018-09-27" e "date_report = 2018-10-08". Colocamos o mesmo arquivo CSV neles;
  • O intervalo final do S3 "analisador de relatórios" deve ser semelhante ao mostrado nas figuras abaixo:


Trabalhar com o AWS Glue
Em geral, você pode fazer apenas o Amazon Athena para criar uma tabela externa a partir dos dados do S3, mas o AWS Glue é uma ferramenta mais flexível e conveniente para isso.

  • Entramos no AWS Glue e criamos um novo rastreador, que coletará uma tabela de arquivos CSV separados por datas de relatório:
    • Defina o nome do novo rastreador;
    • Indicamos o repositório de onde obter os dados (s3: // report-parser / my_trader_diary /)
    • Selecionamos ou criamos uma nova função do IAM que terá acesso para iniciar o Crawler e acessar o recurso especificado no S3;
    • Em seguida, você precisa definir a frequência de início. Definimos isso sob demanda por enquanto, mas, no futuro, acho que isso mudará e o lançamento se tornará semanal;
    • Salve e aguarde a criação do rastreador.
  • Quando o rastreador entrar no estado Pronto, inicie-o!

  • Quando funcionar, uma nova tabela my_trader_diary aparecerá na guia AWS Glue: Database -> Tables:


Considere a tabela gerada com mais detalhes.
Se você clicar no nome da tabela criada, iremos para a página com a descrição dos metadados. Na parte inferior, há um layout de tabela e a mais recente é uma coluna que não estava no arquivo CSV de origem - date_report. Essa coluna que o AWS Glue cria automaticamente com base na definição de seções dos dados de origem (no Bucket S3, nomeamos especialmente as pastas date_report = AAAA-MM-DD, o que nos permitiu usá-las como seções separadas por data).

Particionamento de tabela

Na mesma página, no canto superior direito, há um botão Visualizar partições, clicando no qual podemos ver em quais seções nossa tabela gerada consiste:

Análise de dados


Tendo à nossa disposição os dados processados ​​carregados, podemos facilmente começar a analisá-los. Para começar, considere os recursos do Amazon Athena como a maneira mais fácil e rápida de executar consultas analíticas. Para fazer isso, acesse o serviço Amazon Athena, selecione o banco de dados de que precisamos (financeiro) e escreva o seguinte código SQL:

 select d.date_report, d.account, d.stock_name, d.currency, sum(d.quantity) as quantity, round(sum(d.result), 2) as result from my_trader_diary d group by d.date_report, d.account, d.stock_name, d.currency order by d.account, d.stock_name, d.date_report; 

Essa solicitação nos exibirá um resultado financeiro líquido para cada título para todas as datas dos relatórios. Porque baixamos o mesmo relatório três vezes para datas diferentes, o resultado não será alterado, o que, é claro, em um mercado real será diferente:


Mas e se quisermos visualizar os dados recebidos na forma de tabelas ou diagramas flexíveis? Aqui, o Amazon QuickSight vem em socorro, com a ajuda da qual você pode configurar análises flexíveis quase tão rapidamente quanto escrever uma consulta SQL. Iremos ao serviço Amazon QuickSight (se você ainda não se cadastrou, o registro será necessário).

Clique no botão Novas análises -> Novo conjunto de dados e, na janela exibida, selecione as fontes para o conjunto de dados, clique em Athena:



Vamos criar um nome para nossa fonte de dados, por exemplo, "PNL_analysis" e clicar no botão "Criar fonte de dados".

Em seguida, a janela Escolha sua tabela é aberta, onde você precisa selecionar o banco de dados e a tabela de origem de dados. Selecionamos o banco de dados - financeiro e a tabela: my_traider_diary. Por padrão, a tabela inteira é usada, mas se você selecionar "Usar SQL personalizado", poderá personalizar e ajustar a amostra de dados necessária. Por exemplo, usamos a tabela inteira e clicamos no botão Editar / Visualizar dados.

Uma nova página será aberta, onde você poderá fazer configurações adicionais e processar os dados existentes.

Agora precisamos adicionar campos calculados adicionais ao nosso conjunto de dados: trimestre e ano de operação. Um leitor atento pode perceber que essas manipulações eram mais fáceis de executar no lado do analisador antes de salvar o Relatório Final em CSV. Sem dúvida, meu objetivo agora é demonstrar os recursos e a flexibilidade das configurações do sistema de BI em tempo real. Continuamos a criar campos calculados clicando no botão "Novo campo".

Crie um novo campo

Para destacar o ano da operação e o trimestre, são utilizadas fórmulas simples:


Preenchendo Fórmulas para um Novo Campo

Quando os campos calculados tiverem sido criados e adicionados à seleção com sucesso, dê um nome ao nosso conjunto de dados, por exemplo, "my_pnl_analyze" e clique no botão "Salvar e visualizar".

Depois disso, transferimos para a placa principal do Amazon QuickSight e a primeira coisa que precisamos fazer é configurar um filtro para a data do relatório (levando em consideração que os mesmos dados foram coletados em três seções). Selecione a data do relatório 2018-10-01 e clique no botão Aplicar e vá para a guia Visualizar.

Configuração de filtro

Agora podemos visualizar o resultado do portfólio em qualquer plano, por exemplo, para cada título dentro da conta de negociação e dividido por moeda (porque o resultado não é comparável em moedas diferentes) e tipos de operações. Vamos começar com a ferramenta mais poderosa de qualquer tabela dinâmica de BI. Para economizar espaço e exibir flexibilidade, coloquei a moeda em um controle separado (análogo da fatia no MS Excel)

A tabela acima mostra que, se um investidor decidir vender todas as ações do FGC UES agora, ele registrará uma perda, pois Dividendos pagos no valor de 1.509,91 p. eles não cobrem seus custos (1.763,36 rublos - uma diferença cambial negativa e 174 rublos - imposto de renda pessoal sobre dividendos). Faz sentido esperar e esperar por melhores tempos no Exchange.

O gráfico a seguir é um gráfico de barras:


E agora formaremos uma tabela que nos mostrará quanto investimos em cada artigo, quantos dias há em nosso portfólio e qual é a lucratividade para todo o período de propriedade. Para fazer isso, adicione dois novos campos calculados: sum_investment e count_days.

Campo sum_investment
sum_investment ( ) :

ifelse({stock_name} = ' 24019',{avg_open_price} * quantity * 10,{avg_open_price} * quantity)

, – ( – 1000).

Campo count_days
count_day ( ) :

dateDiff(parseDate({date_oper}),parseDate({date_report}))

A mesa final é apresentada na captura de tela abaixo:



Conclusões e Resumo


Examinamos com você a implementação do analisador de relatórios e os métodos para analisar os dados preparados por ele "on the fly" usando os serviços da Amazon. Também abordamos alguns aspectos comerciais e fundamentais da análise do portfólio de investimentos, como esse tópico é quase imenso e é bastante difícil encaixá-lo em um artigo, acho que faz sentido colocá-lo em uma publicação separada ou mesmo em uma série de publicações.

Quanto ao uso da ferramenta de processamento de relatórios do broker e as abordagens e algoritmos envolvidos, eles podem ser usados ​​(com modificações apropriadas) para processar os relatórios de outros brokers. De qualquer forma, se você planeja adaptar o código às suas necessidades, estou pronto para dar algumas dicas, por isso não hesite em fazer perguntas - tentarei definitivamente respondê-las.

Estou certo de que este sistema encontrará sua aplicação e terá mais desenvolvimento. Por exemplo, está planejado adicionar o cálculo do depositário e outras taxas (por exemplo, para retirada de fundos), bem como o resgate de títulos, etc. ao cálculo do PNL completo para o portfólio ... Os campos calculados no lado Quicksight foram usados ​​para fins de demonstração, na próxima versão do analisador, todas essas colunas adicionais serão portadas para Python e serão calculadas no lado do analisador.

Como arquiteto e o principal cliente comercial desta solução, vejo uma nova atualização da seguinte forma: bem, não quero solicitar manualmente esses relatórios XML sempre! Obviamente, não há outra possibilidade até agora, mas a API do Broker com a transferência do token e o intervalo de amostragem seria ideal para receber relatórios brutos semanais. Processamento automático completo subseqüente no lado da Amazon: desde o acionamento do trabalho ETL no AWS Glue até a obtenção de resultados prontos na forma de gráficos e tabelas no Amazon QuickSight, você poderá automatizar completamente o processo.

O código fonte completo pode ser encontrado no meu repositório GitHub

Source: https://habr.com/ru/post/pt426027/


All Articles