Reconciliação - verificando a integridade dos dados em sistemas distribuídos


Ao desenvolver e usar sistemas distribuídos, somos confrontados com a tarefa de monitorar a integridade e a identidade dos dados entre sistemas - a tarefa de reconciliação .


Os requisitos que o cliente define são o tempo mínimo para esta operação, pois, quanto mais cedo a discrepância for encontrada, mais fácil será eliminar suas conseqüências. A tarefa é significativamente complicada pelo fato de os sistemas estarem em movimento constante (~ 100.000 transações por hora) e 0% das discrepâncias não podem ser alcançadas.


Ideia principal


A idéia principal da solução pode ser descrita no diagrama a seguir.
Consideramos cada um dos elementos separadamente.



Adaptadores de dados


Cada um dos sistemas é projetado para sua própria área de assunto e, como conseqüência, as descrições de objetos podem variar significativamente. Precisamos comparar apenas um determinado conjunto de campos desses objetos.


Para simplificar o procedimento de comparação, levamos os objetos para um único formato, escrevendo um adaptador para cada fonte de dados. Trazer os objetos para um único formato pode reduzir significativamente a quantidade de memória usada, pois armazenaremos apenas os campos comparados.


Sob o capô, o adaptador pode ter qualquer fonte de dados: HttpClient , SqlClient , DynamoDbClient etc.



A seguir, é a interface do IAdapter que você deseja implementar:


public interface IAdapter<T> where T : IModel { int Id { get; } Task<IEnumerable<T>> GetItemsAsync(ISearchModel searchModel); } public interface IModel { Guid Id { get; } int GetHash(); } 

Armazenamento


A reconciliação de dados só pode começar depois que todos os dados foram lidos, pois os adaptadores podem retorná-los em ordem aleatória.


Nesse caso, a quantidade de RAM pode não ser suficiente, especialmente se você executar várias reconciliações ao mesmo tempo, indicando grandes intervalos de tempo.


Considere a interface IStorage


 public interface IStorage { int SourceAdapterId { get; } int TargetAdapterId { get; } int MaxWriteCapacity { get; } Task InitializeAsync(); Task<int> WriteItemsAsync(IEnumerable<IModel> items, int adapterId); Task<IEnumerable<IResultModel>> GetDifferenceAsync(ISearchDifferenceModel model); } public interface ISearchDifferenceModel { int Offset { get; } int Limit { get; } } 

Repositório. Implementação baseada em MS SQL


Implementamos o IStorage usando o MS SQL, o que nos permitiu realizar a comparação completamente no lado do servidor Db.


Para armazenar os valores solicitados, basta criar a seguinte tabela:


 CREATE TABLE [dbo].[Storage_1540747667] ( [id] UNIQUEIDENTIFIER NOT NULL, [adapterid] INT NOT NULL, [qty] INT NOT NULL, [price] INT NOT NULL, CONSTRAINT [PK_Storage_1540747667] PRIMARY KEY ([id], [adapterid]) ) 

Cada entrada contém campos do sistema ( [id] , [adapterId] ) e campos para comparação ( [qty] , [price] ). Algumas palavras sobre os campos do sistema:


[id] - identificador exclusivo da entrada, o mesmo nos dois sistemas
[adapterId] - identificador do adaptador através do qual o registro foi recebido


Como os processos de reconciliação podem ser iniciados em paralelo e com intervalos de interseção, criamos uma tabela com um nome exclusivo para cada um deles. Se a reconciliação for bem-sucedida, essa tabela será excluída; caso contrário, um relatório será enviado com uma lista de registros nos quais existem discrepâncias.


Repositório. Comparação de Valor



Imagine que temos 2 conjuntos cujos elementos têm um conjunto absolutamente idêntico de campos. Considere 4 casos possíveis de sua interseção:


A. Os elementos estão presentes apenas no conjunto esquerdo.
B. Os elementos estão presentes nos dois conjuntos, mas têm significados diferentes.
S Os elementos estão presentes apenas no conjunto certo.
D Os elementos estão presentes nos dois conjuntos e têm o mesmo significado.


Em um problema específico, precisamos encontrar os elementos descritos nos casos A, B, C. Você pode obter o resultado desejado em uma consulta para o MS SQL via FULL OUTER JOIN :


 select [s1].[id], [s1].[adapterid] from [dbo].[Storage_1540758006] as [s1] full outer join [dbo].[Storage_1540758006] as [s2] on [s2].[id] = [s1].[id] and [s2].[adapterid] != [s1].[adapterid] and [s2].[qty] = [s1].[qty] and [s2].[price] = [s1].[price] where [s2].[id] is nul 

A saída desta solicitação pode conter 4 tipos de registros que atendem aos requisitos iniciais


#idadapteridcomentário
1guid1adp1O registro está presente apenas no conjunto esquerdo. Caso A
2guid2adp2O registro está presente apenas no conjunto correto. Caso C
3guid3adp1Os registros estão presentes nos dois conjuntos, mas têm significados diferentes. Caso B
4guid3adp2Os registros estão presentes nos dois conjuntos, mas têm significados diferentes. Caso B

Repositório. Hashing


Usando hash em objetos comparados, você pode reduzir significativamente o custo das operações de gravação e comparação. Especialmente quando se trata de comparar dezenas de campos.


O método mais universal de hash de uma representação serializada de um objeto acabou por ser.


1. Para o hash, usamos o método GetHashCode () padrão, que retorna int32 e é substituído por todos os tipos primitivos.
2. Nesse caso, a probabilidade de colisões é improvável, pois apenas os registros com os mesmos identificadores são comparados.


Considere a estrutura da tabela usada nesta otimização:


 CREATE TABLE [dbo].[Storage_1540758006] ( [id] UNIQUEIDENTIFIER NOT NULL, [adapterid] INT NOT NULL, [hash] INT NOT NULL, CONSTRAINT [PK_Storage_1540758006] PRIMARY KEY ([id], [adapterid], [hash]) ) 

A vantagem dessa estrutura é o custo constante de armazenar um registro (24 bytes), o que não dependerá do número de campos comparados.


Naturalmente, o procedimento de comparação sofre alterações e se torna muito mais simples.


 select [s1].[id], [s1].[adapterid] from [dbo].[Storage_1540758006] as [s1] full outer join [dbo].[Storage_1540758006] as [s2] on [s2].[id] = [s1].[id] and [s2].[adapterid] != [s1].[adapterid] and [s2].[hash] = [s1].[hash] where [s2].[id] is null 

CPU


Nesta seção, falaremos sobre uma classe que contém toda a lógica comercial da reconciliação, a saber:


1. leitura paralela de dados de adaptadores
2. hash de dados
3. registro de valores no buffer
4. entrega de resultados


Uma descrição mais abrangente do processo de reconciliação pode ser obtida observando o diagrama de sequência e a interface do IProcessor.

 public interface IProcessor<T> where T : IModel { IAdapter<T> SourceAdapter { get; } IAdapter<T> TargetAdapter { get; } IStorage Storage { get; } Task<IProcessResult> ProcessAsync(); Task<IEnumerable<IResultModel>> GetDifferenceAsync(ISearchDifferenceModel model); } 


Agradecimentos


Muito obrigado aos meus colegas do MySale Group pelo feedback: AntonStrakhov , Nesstory , Barlog_5 , Kostya Krivtsun e VeterManve - o autor da idéia.

Source: https://habr.com/ru/post/pt428443/


All Articles