Há algum tempo, meu colega me pediu para ajudá-lo com um problema. Eu resolvi o problema para ele, mas além disso, pareceu-me que, ao resolver esse problema, vários algoritmos e técnicas de programação podem ser explicados. E também mostra a aceleração do tempo de execução do algoritmo de 25 segundos a 40 ms.
Declaração do problema
Para um projeto pessoal, meu colega precisava de um algoritmo para encontrar cinquenta vídeos mais semelhantes para um determinado vídeo. A similaridade deveria ser estimada pelo número de tags expostas correspondentes. Quanto mais tags o vídeo corresponder, mais semelhantes. A partir disso, podemos tirar imediatamente várias conclusões:
- Todas as tags do vídeo podem ser combinadas em um grupo;
- Definitivamente, não haverá mais grupos do que os próprios vídeos;
- se o vídeo for semelhante a outro vídeo de um determinado grupo de tags, será igualmente semelhante a outros vídeos desse grupo;
Acontece que basta trabalhar apenas com grupos de tags. Na primeira versão, um colega decidiu armazenar tags em uma tabela de tags: cada vídeo possui um link para o ID do grupo de tags e os próprios grupos são uma sequência de valores booleanos que indicam se a tag correspondente está definida. Em C #, um grupo de tags fica assim:
public class TagsGroup { bool[] InnerTags { get; } }
Um colega sugeriu que ele não teria mais de um milhão de vídeos no site e mais de 4.000 tags diferentes. Para uma conta redonda, você pode usar 4096 = 2 ^ 12.
Em seguida, a classe TagsGroup
pode ser representada assim:
public class TagsGroup { public const int TagsGroupLength = 4096; bool[] InnerTags { get; } public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } InnerTags = innerTags; } }
Agora você precisa verificar os dois grupos de tags quanto à semelhança. Nas condições atuais, isso se transforma em uma simples verificação de true nos elementos correspondentes das matrizes InnerTags
de dois grupos de tags:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i]) result++; } return result; }
Agora resta apenas calcular a semelhança do grupo de tags desejado com cada grupo existente e selecionar os cinquenta mais semelhantes. Estabeleci outra condição para garantir a estabilidade da amostra, ou seja, na amostra final, haverá cinquenta grupos de tags para os quais o MeasureSimilarity
obteve o resultado mais alto, enquanto os grupos de tags com o mesmo MeasureSimilarity
índice mais baixo para aqueles que tiveram um índice mais baixo no grupo existente original. Mais detalhes podem ser encontrados, por exemplo, aqui: https://ru.wikipedia.org/wiki/Sustainable_Sort .
Para resolver esse problema, decidi fazer a classe SimilarTagsCalculator
, aqui está o código:
SimilarTagsCalculator class SimilarTagsCalculator { TagsGroup[] Groups { get; } public SimilarTagsCalculator(TagsGroup[] groups) { if (groups == null) throw new ArgumentException(nameof(groups)); Groups = groups; } public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) { const int resultLength = 50;
e a estrutura TagsSimilarityInfo
:
TagsSimilarityInfo struct TagsSimilarityInfo : IComparable<TagsSimilarityInfo>, IComparable { public int Index { get; } public int Similarity { get; } public TagsSimilarityInfo(int index, int similarity) { Index = index; Similarity = similarity; } public bool Equals(TagsSimilarityInfo other) { return Index == other.Index && Similarity == other.Similarity; } public override bool Equals(object obj) { return obj is TagsSimilarityInfo other && Equals(other); } public override int GetHashCode() { unchecked { return (Index * 397) ^ Similarity; } } public int CompareTo(TagsSimilarityInfo other) { int similarityComparison = other.Similarity.CompareTo(Similarity); return similarityComparison != 0 ? similarityComparison : Index.CompareTo(other.Index); } public int CompareTo(object obj) { if (ReferenceEquals(null, obj)) return 1; return obj is TagsSimilarityInfo other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(TagsSimilarityInfo)}"); } }
Eu preparei três benchmarks para esse algoritmo:
- referência completamente aleatória, ou seja, o número de tags definidas nos grupos é aleatório e o grupo de tags com o qual compararemos também é aleatório;
- Se o número de tags definidas em grupos estiver aumentando, compararemos com o grupo em que todas as tags estão definidas. Acontece que alguns dos últimos grupos de tags devem ser os mais adequados;
- o mesmo que acima, mas o número de tags expostas está diminuindo. Os primeiros 50 grupos de tags serão mais adequados;
Aqui estão os resultados de referência para um milhão de grupos:
BenchmarkDotNet = v0.11.5, OS = Windows 10.0.17134.765 (1803 / April2018Update / Redstone4)
CPU Intel Core i7-6700 de 3,40 GHz (Skylake), 1 CPU, 8 núcleos lógicos e 4 físicos
Frequência = 3328126 Hz, Resolução = 300.4694 ns, Temporizador = TSC
SDK do .NET Core = 3.0.100-preview5-011568
[Host]: .NET Core 3.0.0-preview5-27626-15 (CoreCLR 4.6.27622.75, CoreFX 4.700.19.22408), RyuJIT de 64 bits
O tempo de execução é muito grande, além de 25 segundos é muito tempo, meu colega não concorda em esperar tanto. Então, vamos fazer otimizações. Agora, existem três áreas principais para acelerar o programa:
- Método
MeasureSimilarity
; - um algoritmo no corpo do loop em
GetFiftyMostSimilarGroups
; - o próprio loop em
GetFiftyMostSimilarGroups
;
Vamos considerar cada uma das três direções em sequência.
Previsão de ramificação
Primeiro, considere o método MeasureSimilarity
.
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i]) result++; } return result; }
No benchmark anterior, houve uma variação muito grande no tempo de execução entre o teste aleatório e qualquer um dos sequenciais. Os grupos de tags para testes sequenciais foram criados pelo seguinte princípio:
- o número necessário de grupos foi dividido em pacotes. Número de pacotes - o número máximo de tags no grupo;
- para cada grupo no i-ésimo pacote, as primeiras tags i foram definidas;
Acontece que cada grupo de tags nesses testes consiste em duas partes consecutivas de tags expostas e não expostas. MeasureSimilarity
possui todos os pré-requisitos para que a previsão de ramificação do processador tenha um efeito significativo nas condições atuais. Para verificar isso, basta escrever uma referência que compare o tempo de execução do MeasureSimilarity para dados aleatórios e sequenciais:
int GetSimilaritySum(TagsGroup[] tagsGroups) { int result = 0; foreach (TagsGroup tagsGroup in tagsGroups) { result += TagsGroup.MeasureSimilarity(tagsGroup, etalon); } return result; } [Benchmark] public int Sorted() => GetSimilaritySum(sortedGroups); [Benchmark] public int Unsorted() => GetSimilaritySum(unsortedGroups);
Um milhão de grupos de tags foi testado, mas em Sorted
em cada grupo, inicialmente, havia várias tags expostas e depois não expostas, e em Unsorted
o mesmo número de tags expostas foi espalhado aleatoriamente por todo o grupo.
A diferença de 5 segundos é impressionante, e algo precisa ser feito. Para se livrar da influência da previsão de ramificação e geralmente acelerar o método, você precisa se livrar das ramificações. Existe apenas uma ramificação no MeasureSimilarity
- verificando se as tags correspondentes estão definidas em dois grupos. Vamos estimar em quais casos a condição será verdadeira, para isso faremos uma tabela da verdade da condição:
A tabela verdade coincide completamente com o lógico "AND", ou seja, o resultado será verdadeiro se, e somente se as duas tags forem verdadeiras, a condição poderá ser reduzida para: if (a.InnerTags[i] && b.InnerTags[i])
. Mas desta maneira a condição ainda permanece. Na próxima etapa, garantiremos que a adição ao resultado seja sempre realizada, para isso reescrevemos o corpo do loop da seguinte maneira:
int t = a.InnerTags[i] && b.InnerTags[i] ? 1 : 0; result += t;
Ainda não nos livramos da condição e até tornamos o método mais lento. Mas agora ficou óbvio que, se o tipo de InnerTags
alterado de bool para byte (1 para true e 0 para false), você poderá se livrar da condição no operador ternário. Em seguida, a classe TagsGroup
ficará assim:
TagsGrupo public class TagsGroup { public const int TagsGroupLength = 4096; byte[] InnerTags { get; } public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { int t = a.InnerTags[i] & b.InnerTags[i]; result += t; } return result; } public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } InnerTags = new byte[TagsGroupLength]; for (int i = 0; i < TagsGroupLength; i++) { InnerTags[i] = (byte) (innerTags[i] ? 1 : 0); } } }
Aqui estão os resultados de referência para o MeasureSimilarity
atualizado:
mas para o bechmark principal atualizado:
Na minha opinião, já foi ótimo. Para aqueles que estavam convencidos de que toda a aceleração aconteceu apenas porque o tipo booleano foi substituído por um byte, lancei uma referência para esse corpo de loop:
int t = a.InnerTags[i] & b.InnerTags[i]; if (t == 1) result += t;
e estes são os resultados:
Empacotamento de dados
Cada grupo possui muitas tags e seu número não pode ser reduzido de forma alguma. Além disso, você deve comparar as tags com o mesmo índice e não pode dar uma resposta final sem verificar todas as tags. Portanto, em qualquer caso, teremos que repetir todo o grupo de tags. Seria ótimo poder paralelizar essa tarefa de alguma forma, para que fosse possível processar várias tags em uma operação condicional. Você pode fazer isso através de paralelismo real ou através de empacotamento de dados especiais, que usaremos. Cada tag agora representa 1 ou 0. No result
resultado da operação "AND" é simplesmente acumulado. Mas a mesma operação lógica pode ser aplicada não apenas aos números de um bit. O C # permite fazer isso sem problemas, com números de até 64 bits (você pode fazer mais com o BitArray
, mas não é isso). Se representarmos dois grupos de tags como um conjunto de números de 64 bits com o conjunto de bits correspondente, será possível executar uma operação "AND" em cada grupo de números de 64 bits. Não está claro o que fazer com o resultado. Vamos olhar novamente para o corpo do loop:
int t = a.InnerTags[i] & b.InnerTags[i]; result += t;
o resultado aumenta em 1 toda vez que t == 1 e não muda quando t == 0. Como resultado, o resultado será igual a quantas vezes o resultado de a.InnerTags[i] & b.InnerTags[i]
era um. Assim, seria possível salvar todos os resultados de a.InnerTags[i] & b.InnerTags[i]
em algum tipo de matriz e, no resultado, gravar apenas o número de unidades nessa matriz. Ao executar a operação AND em mais de números de n bits, um resultado de n bits será e será suficiente apenas para saber quantos bits são definidos entre n. O número de bits definido no número é inalterado, o que significa que você pode contar esses números. Não faz sentido contar para 64 bits, pois não encontraremos tanta memória RAM. Para 32 bits, você já pode encontrar espaço em computadores modernos, mas isso ainda é muito. Não é difícil encontrar memória com menos de 16 bits, mas o cálculo será relativamente longo. Como compromisso, vamos calcular números de 8 bits:
GenerateCountOfSettedBits static readonly byte[] CountOfSettedBits = GenerateCountOfSettedBits(); static byte[] GenerateCountOfSettedBits() {
agora o construtor TagsGroup se parece com isso:
const int BucketSize = 8; public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } int index = 0;
E o MeasureSimilarity
começou a ficar assim:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { int t = a.InnerTags[i] & b.InnerTags[i]; result += CountOfSettedBits[t]; } return result; }
Você pode executar uma grande referência e garantir que tudo esteja melhor:
É possível tornar o método MeasureSimilarity
ainda mais rápido? Claro! Para fazer isso, basta perceber o fato de que os registros de uso geral agora são na sua maioria de 64 bits, e nós direcionamos dados de oito bits neles. Para fazer isso, aumente o tamanho do pacote no qual as tags originais são empacotadas, aumente para 64 bits e reescreva os métodos necessários:
const int BucketSize = 64; ulong[] InnerTags { get; } public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; for (int j = 0; j < BucketSize / 8; j++) { result += CountOfSettedBits[t & 255]; t >>= 8; } } return result; }
e acontece:
Então você pode expandir o loop interno:
MeasureSimilarity public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; } return result; }
É ainda mais rápido? Sim Se você usar as inovações do .NET Core 3.0. Embora esta versão ainda esteja em pré-visualização, mas desde o início há uma implementação de algumas intrínsecas. O Guia intrínseco da Intel possui o _mm_popcnt_u64
intrínseco. Que, conforme descrito: " Conte o número de bits definido como 1 no número inteiro não assinado de 64 bits a e retorne essa contagem em dst. ". É exatamente isso que estamos tentando alcançar! No .NET Core 3.0 Preview 5, esse intrínseco é implementado em System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount
(conforme observado corretamente nos comentários do a-tk , você deve verificar se o processador os suporta antes de usar o intrínseco. Nesse caso, verifique a condição do System.Runtime.Intrinsics.X86.Popcnt.X64.IsSupported
. System.Runtime.Intrinsics.X86.Popcnt.X64.IsSupported
). Utilizando-o, o código do método MeasureSimilarity
ficará assim:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; result += (int) System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount(t); } return result; }
e tempo de execução:
Impressionante.
Não conheço as maneiras que podem acelerar significativamente o MeasureSimilarity
e, ao mesmo tempo, não prejudicar muito a legibilidade. Eu acho que você pode terminar esse método.
Estruturas de dados
Agora, GetFiftyMostSimilarGroups
com o corpo do loop no método GetFiftyMostSimilarGroups
:
GetFiftyMostSimilarGroups public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) { const int resultLength = 50; List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(50); for (int groupIndex = 0; groupIndex < Groups.Length; groupIndex++) { TagsGroup tagsGroup = Groups[groupIndex]; int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup); TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue); if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { continue; } int index = ~list.BinarySearch(newInfo); list.Insert(index, newInfo); if (list.Count > resultLength) { list.RemoveAt(resultLength); } } TagsGroup[] result = new TagsGroup[resultLength]; for (int i = 0; i < resultLength; i++) { result[i] = Groups[list[i].Index]; } return result; }
Permitam-me relembrar brevemente o que está acontecendo aqui:
- dentro da lista, uma lista classificada dos cinquenta grupos de tags mais adequados é armazenada, de menor para maior, se você comparar
TagsSimilarityInfo
; - insira o novo grupo em questão na lista enquanto preserva a classificação;
- se houver mais de cinquenta elementos na lista, exclua o grupo menos semelhante (seu objeto de informação será o maior e estará no final da
list
);
I.e. acontece que precisamos encontrar rapidamente o maior elemento da coleção, poder inserir e excluir rapidamente. Para resolver esses problemas, existem estruturas de dados especiais. A primeira coisa que vem à mente é um monte . Sua inserção é realizada em O (log N), obtendo o máximo em O (1), removendo um elemento em O (log N). O único problema é que o heap não pode ser iterado pelos elementos crescentes sem modificá-lo. Não há heap binário no BCL, então eu mesmo escrevi:
Binaryheap public class BinaryHeap<T>:IEnumerable<T> where T : IComparable<T> { readonly List<T> innerList; public BinaryHeap(int capacity) { innerList = new List<T>(capacity); } public int Count => innerList.Count; public T Max => innerList[0]; public void Add(T value) { innerList.Add(value); int i = Count - 1; int parent = (i - 1) >> 1; while (i > 0 && innerList[parent].CompareTo(innerList[i]) == -1) { Swap(i, parent); i = parent; parent = (i - 1) >> 1; } } void Swap(int a, int b) { T temp = innerList[a]; innerList[a] = innerList[b]; innerList[b] = temp; } void Heapify(int i) { for (;;) { int leftChild = (i << 1) | 1; int rightChild = (i + 1) << 1; int largestChild = i; if (leftChild < Count && innerList[leftChild].CompareTo(innerList[largestChild]) == 1) { largestChild = leftChild; } if (rightChild < Count && innerList[rightChild].CompareTo(innerList[largestChild]) == 1) { largestChild = rightChild; } if (largestChild == i) { break; } Swap(i, largestChild); i = largestChild; } } public void RemoveMax() { innerList[0] = innerList[Count - 1]; innerList.RemoveAt(Count - 1); Heapify(0); } public IEnumerator<T> GetEnumerator() { return innerList.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable) innerList).GetEnumerator(); } }
a implementação correspondente do método GetFiftyMostSimilarGroups
pode ser encontrada no código-fonte do artigo (link abaixo).
Além da pilha, uma árvore de pesquisa binária pode aparecer. Uma árvore de pesquisa binária equilibrada pode fornecer inserção para O (log N), obtendo um máximo para O (log N), removendo um elemento para O (log N). A vantagem dessa estrutura é que ela pode ser iterada em ordem crescente e, além disso, a árvore de pesquisa vermelho-preto na BCL é implementada no SortedSet (em uma grande estrutura, a obtenção de um máximo é muito mais lenta que no .netcore 3.0 e aloca memória). A implementação do GetFiftyMostSimilarGroups
para SortedSet pode ser encontrada no código-fonte do artigo.
Resultados de benchmark para todas as três implementações GetFiftyMostSimilarGroups
:
A implementação original com uma folha vence quase todos os lugares no tempo, e certamente todos os lugares na memória. Isso ocorre devido ao fato de que para um algoritmo com uma planilha, a inserção é realizada em O (log N) para a pesquisa e quase O (1) para a inserção, porque copiar um número tão pequeno de elementos ocorre muito rapidamente, obtendo um máximo para O (1), excluindo um elemento também para O (1), porque no .net, a exclusão do último elemento da planilha é substituída por escrever no último elemento um valor vazio (no núcleo .net, nada é gravado nas estruturas). Se fosse necessário fornecer não 50, mas digamos 1000 dos grupos mais semelhantes, provavelmente um algoritmo com uma planilha não funcionaria. De fato, tudo isso é um pouco de raciocínio especulativo, porque Você ainda pode ajustar cada um dos algoritmos.
Multithreading
Agora resta tentar melhorar o próprio loop no GetFiftyMostSimilarGroups
. Somente multithreading vem à mente. A idéia é dividir a lista inteira de grupos em vários pacotes. Em cada pacote, encontre os 50 grupos de tags mais semelhantes e, entre eles, encontre os 50 finais mais semelhantes.
A versão multithread do GetFiftyMostSimilarGroups
é assim:
GetFiftyMostSimilarGroupsMultiThread public TagsGroup[] GetFiftyMostSimilarGroupsMultiThread(TagsGroup value) { const int resultLength = 50;
GetFiftyMostSimilarGroupsMultiThreadCore
GetFiftyMostSimilarGroups
:
GetFiftyMostSimilarGroupsMultiThreadCore List<TagsSimilarityInfo> GetFiftyMostSimilarGroupsMultiThreadCore(TagsGroup value, int leftIndex, int rightIndex) { const int resultLength = 50; List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(resultLength); for (int groupIndex = leftIndex; groupIndex < rightIndex; groupIndex++) { TagsGroup tagsGroup = Groups[groupIndex]; int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup); TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue); if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { continue; } int index = ~list.BinarySearch(newInfo); list.Insert(index, newInfo); if (list.Count > resultLength) { list.RemoveAt(resultLength); } } return list; }
MergeTaskResults
. - taskResults . , , . , threadsCount
, : , , , :
MergeTaskResults TagsGroup[] MergeTaskResults(int resultLength, int threadsCount, List<TagsSimilarityInfo>[] taskResults) { TagsGroup[] result = new TagsGroup[resultLength]; int[] indices = new int[threadsCount]; for (int i = 0; i < resultLength; i++) { int minIndex = 0; TagsSimilarityInfo currentBest = taskResults[minIndex][indices[minIndex]]; for (int j = 0; j < threadsCount; j++) { var current = taskResults[j][indices[j]]; if (current.CompareTo(currentBest) == -1) { minIndex = j; currentBest = taskResults[minIndex][indices[minIndex]]; } } int groupIndex = currentBest.Index; result[i] = Groups[groupIndex]; indices[minIndex]++; } return result; }
indices
taskResults
;minIndex
— taskResults
, ;currentBest
— - ;current
— - ;
:
. . , , 4 50. , , .