
Ketika mengembangkan dan menggunakan sistem terdistribusi, kita dihadapkan dengan tugas untuk memantau integritas dan identitas data antar sistem - tugas rekonsiliasi .
Persyaratan yang ditetapkan pelanggan adalah waktu minimum untuk operasi ini, karena semakin cepat perbedaan ditemukan, semakin mudah untuk menghilangkan konsekuensinya. Tugas ini terasa rumit oleh fakta bahwa sistem berada dalam gerakan konstan (~ 100.000 transaksi per jam) dan 0% dari perbedaan tidak dapat dicapai.
Ide utama
Gagasan utama dari solusi dapat dijelaskan dalam diagram berikut.
Kami mempertimbangkan masing-masing elemen secara terpisah.

Adaptor data
Setiap sistem dirancang untuk bidang subjeknya masing-masing dan, sebagai konsekuensinya, deskripsi objek dapat sangat bervariasi. Kita hanya perlu membandingkan satu set bidang tertentu dari objek ini.
Untuk menyederhanakan prosedur perbandingan, kami membawa objek ke format tunggal dengan menulis adaptor untuk setiap sumber data. Membawa objek ke format tunggal dapat secara signifikan mengurangi jumlah memori yang digunakan, karena kami hanya akan menyimpan bidang yang dibandingkan.
Di bawah tenda, adaptor dapat memiliki sumber data apa pun: HttpClient , SqlClient , DynamoDbClient , dll.

Berikut ini adalah antarmuka IAdapter yang ingin Anda terapkan:
public interface IAdapter<T> where T : IModel { int Id { get; } Task<IEnumerable<T>> GetItemsAsync(ISearchModel searchModel); } public interface IModel { Guid Id { get; } int GetHash(); }
Penyimpanan
Rekonsiliasi data hanya dapat dimulai setelah semua data dibaca, karena adaptor dapat mengembalikannya dalam urutan acak.
Dalam hal ini, jumlah RAM mungkin tidak cukup, terutama jika Anda menjalankan beberapa rekonsiliasi secara bersamaan, yang menunjukkan interval waktu yang besar.
Pertimbangkan antarmuka IStorage
public interface IStorage { int SourceAdapterId { get; } int TargetAdapterId { get; } int MaxWriteCapacity { get; } Task InitializeAsync(); Task<int> WriteItemsAsync(IEnumerable<IModel> items, int adapterId); Task<IEnumerable<IResultModel>> GetDifferenceAsync(ISearchDifferenceModel model); } public interface ISearchDifferenceModel { int Offset { get; } int Limit { get; } }
Repositori. Implementasi berbasis MS SQL
Kami menerapkan IStorage menggunakan MS SQL, yang memungkinkan kami untuk melakukan perbandingan sepenuhnya di sisi server Db.
Untuk menyimpan nilai yang diminta, cukup buat tabel berikut:
CREATE TABLE [dbo].[Storage_1540747667] ( [id] UNIQUEIDENTIFIER NOT NULL, [adapterid] INT NOT NULL, [qty] INT NOT NULL, [price] INT NOT NULL, CONSTRAINT [PK_Storage_1540747667] PRIMARY KEY ([id], [adapterid]) )
Setiap catatan berisi bidang sistem ( [id] , [adapterId] ) dan bidang untuk perbandingan ( [qty] , [harga] ). Beberapa kata tentang bidang sistem:
[id] - pengidentifikasi unik dari entri, sama di kedua sistem
[adapterId] - pengidentifikasi adaptor yang digunakan untuk menerima catatan
Karena proses rekonsiliasi dapat dimulai secara paralel dan memiliki interval berpotongan, kami membuat tabel dengan nama unik untuk masing-masingnya. Jika rekonsiliasi berhasil, tabel ini dihapus, jika tidak, laporan akan dikirim dengan daftar catatan di mana ada perbedaan.
Repositori. Perbandingan Nilai

Bayangkan kita memiliki 2 set elemen yang memiliki set bidang yang benar-benar identik. Pertimbangkan 4 kemungkinan kasus persimpangan mereka:
A. Elemen hanya ada di set kiri.
B. Elemen hadir di kedua set, tetapi memiliki makna berbeda.
S Elemen hadir hanya di set yang tepat.
D Elemen hadir di kedua set dan memiliki makna yang sama.
Dalam masalah khusus, kita perlu menemukan elemen-elemen yang dijelaskan dalam kasus A, B, C. Anda bisa mendapatkan hasil yang diinginkan dalam satu permintaan ke MS SQL melalui FULL OUTER JOIN :
select [s1].[id], [s1].[adapterid] from [dbo].[Storage_1540758006] as [s1] full outer join [dbo].[Storage_1540758006] as [s2] on [s2].[id] = [s1].[id] and [s2].[adapterid] != [s1].[adapterid] and [s2].[qty] = [s1].[qty] and [s2].[price] = [s1].[price] where [s2].[id] is nul
Output dari permintaan ini dapat berisi 4 jenis catatan yang memenuhi persyaratan awal
# | id | adaptorid | komentar |
---|
1 | guid1 | adp1 | Rekor hadir hanya di set kiri. Kasus A |
2 | guid2 | adp2 | Catatan hadir hanya di set yang tepat. Kasus C |
3 | guid3 | adp1 | Catatan hadir di kedua set, tetapi memiliki arti berbeda. Kasus B |
4 | guid3 | adp2 | Catatan hadir di kedua set, tetapi memiliki arti berbeda. Kasus B |
Repositori. Hashing
Menggunakan hashing pada objek yang dibandingkan, Anda dapat secara signifikan mengurangi biaya penulisan dan membandingkan operasi. Terutama ketika membandingkan puluhan bidang.
Metode yang paling universal dari hashing representasi serial dari suatu objek ternyata.

1. Untuk hashing, kita menggunakan metode GetHashCode () standar, yang mengembalikan int32 dan diganti untuk semua tipe primitif.
2. Dalam hal ini, kemungkinan tabrakan tidak mungkin, karena hanya catatan dengan pengidentifikasi yang sama yang dibandingkan.
Pertimbangkan struktur tabel yang digunakan dalam optimasi ini:
CREATE TABLE [dbo].[Storage_1540758006] ( [id] UNIQUEIDENTIFIER NOT NULL, [adapterid] INT NOT NULL, [hash] INT NOT NULL, CONSTRAINT [PK_Storage_1540758006] PRIMARY KEY ([id], [adapterid], [hash]) )
Keuntungan dari struktur seperti itu adalah biaya konstan untuk menyimpan satu record (24 byte), yang tidak akan tergantung pada jumlah bidang yang dibandingkan.
Secara alami, prosedur perbandingan mengalami perubahan dan menjadi lebih sederhana.
select [s1].[id], [s1].[adapterid] from [dbo].[Storage_1540758006] as [s1] full outer join [dbo].[Storage_1540758006] as [s2] on [s2].[id] = [s1].[id] and [s2].[adapterid] != [s1].[adapterid] and [s2].[hash] = [s1].[hash] where [s2].[id] is null
CPU
Pada bagian ini, kita akan berbicara tentang kelas yang berisi semua logika bisnis rekonsiliasi, yaitu:
1. pembacaan data secara paralel dari adaptor
2. hashing data
3. buffered record of values ββdalam database
4. pengiriman hasil
Deskripsi yang lebih komprehensif tentang proses rekonsiliasi dapat diperoleh dengan melihat diagram urutan dan antarmuka IProcessor.
public interface IProcessor<T> where T : IModel { IAdapter<T> SourceAdapter { get; } IAdapter<T> TargetAdapter { get; } IStorage Storage { get; } Task<IProcessResult> ProcessAsync(); Task<IEnumerable<IResultModel>> GetDifferenceAsync(ISearchDifferenceModel model); }

Ucapan Terima Kasih
Terima kasih banyak kepada kolega saya dari MySale Group atas umpan balik: AntonStrakhov , Nesstory , Barlog_5 , Kostya Krivtsun dan VeterManve - penulis ide tersebut.