Hace algún tiempo, mi colega me pidió que lo ayudara con un problema. Resolví el problema por él, pero además, me pareció que al resolver este problema se podrían explicar varios algoritmos y técnicas de programación. Y también muestra la aceleración del tiempo de ejecución del algoritmo de 25 segundos a 40 ms.
Declaración del problema.
Para un proyecto personal, mi colega necesitaba un algoritmo para encontrar cincuenta videos más similares para un video dado. Se suponía que la similitud se estimaba por el número de etiquetas expuestas coincidentes. Cuantas más etiquetas coincida con el video, más se parecen. De esto podemos sacar inmediatamente varias conclusiones:
- Todas las etiquetas debajo del video se pueden combinar en un grupo;
- definitivamente no habrá más grupos de este tipo que los videos mismos;
- si el video es similar a otro video de un determinado grupo de etiquetas, entonces es igualmente similar a otros videos de este grupo;
Resulta que es suficiente trabajar solo con grupos de etiquetas. En la primera versión, un colega decidió almacenar etiquetas en una tabla de etiquetas: cada video tiene un enlace a la ID del grupo de etiquetas, y los grupos mismos son una secuencia de valores booleanos que indican si la etiqueta correspondiente está configurada. En C #, un grupo de etiquetas se ve así:
public class TagsGroup { bool[] InnerTags { get; } }
Un colega sugirió que en el sitio no tendría más de un millón de videos, y varias etiquetas no más de 4000, para una cuenta redonda puede tomar 4096 = 2 ^ 12.
Entonces la clase TagsGroup
se puede representar así:
public class TagsGroup { public const int TagsGroupLength = 4096; bool[] InnerTags { get; } public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } InnerTags = innerTags; } }
Ahora debe verificar la similitud de los dos grupos de etiquetas. En las condiciones actuales, esto se convierte en una simple verificación de verdadero en los elementos correspondientes de las matrices InnerTags
de dos grupos de etiquetas:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i]) result++; } return result; }
Ahora solo queda calcular la similitud del grupo de etiquetas deseado con cada grupo existente y seleccionar los cincuenta más similares. Me puse otra condición para asegurar la estabilidad de la muestra, es decir en la muestra final, habrá cincuenta grupos de etiquetas para los cuales MeasureSimilarity
dio el resultado más alto, mientras que los grupos de etiquetas con la misma MeasureSimilarity
índice más bajo para aquellos que tenían un índice más bajo en el grupo existente original. Se pueden encontrar más detalles, por ejemplo, aquí: https://ru.wikipedia.org/wiki/Sustainable_Sort .
Para resolver este problema, decidí hacer la clase SimilarTagsCalculator
, aquí está su código:
Etiquetas similaresCalculador class SimilarTagsCalculator { TagsGroup[] Groups { get; } public SimilarTagsCalculator(TagsGroup[] groups) { if (groups == null) throw new ArgumentException(nameof(groups)); Groups = groups; } public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) { const int resultLength = 50;
y la estructura TagsSimilarityInfo
:
EtiquetasSimilaridadInfo struct TagsSimilarityInfo : IComparable<TagsSimilarityInfo>, IComparable { public int Index { get; } public int Similarity { get; } public TagsSimilarityInfo(int index, int similarity) { Index = index; Similarity = similarity; } public bool Equals(TagsSimilarityInfo other) { return Index == other.Index && Similarity == other.Similarity; } public override bool Equals(object obj) { return obj is TagsSimilarityInfo other && Equals(other); } public override int GetHashCode() { unchecked { return (Index * 397) ^ Similarity; } } public int CompareTo(TagsSimilarityInfo other) { int similarityComparison = other.Similarity.CompareTo(Similarity); return similarityComparison != 0 ? similarityComparison : Index.CompareTo(other.Index); } public int CompareTo(object obj) { if (ReferenceEquals(null, obj)) return 1; return obj is TagsSimilarityInfo other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(TagsSimilarityInfo)}"); } }
Preparé tres puntos de referencia para este algoritmo:
- punto de referencia completamente al azar, es decir el número de etiquetas establecidas en los grupos es aleatorio y el grupo de etiquetas con el que compararemos también es aleatorio;
- el número de etiquetas establecidas en grupos está aumentando, lo compararemos con el grupo en el que se configuran todas las etiquetas. Resulta que algunos de los últimos grupos de etiquetas deberían ser los más adecuados;
- igual que el anterior, pero el número de etiquetas expuestas está disminuyendo. Los primeros 50 grupos de etiquetas serán los más adecuados;
Estos son los resultados de referencia para un millón de grupos:
BenchmarkDotNet = v0.11.5, OS = Windows 10.0.17134.765 (1803 / April2018Update / Redstone4)
Intel Core i7-6700 CPU 3.40GHz (Skylake), 1 CPU, 8 núcleos lógicos y 4 físicos
Frecuencia = 3328126 Hz, Resolución = 300.4694 ns, Temporizador = TSC
.NET Core SDK = 3.0.100-preview5-011568
[Host]: .NET Core 3.0.0-preview5-27626-15 (CoreCLR 4.6.27622.75, CoreFX 4.700.19.22408), RyuJIT de 64 bits
La extensión del tiempo de ejecución es muy grande, además de 25 segundos es un tiempo muy largo, mi colega no acepta esperar tanto. Entonces hagamos optimizaciones. Ahora hay tres áreas principales para acelerar el programa:
MeasureSimilarity
Método de MeasureSimilarity
;- un algoritmo en el cuerpo del bucle en
GetFiftyMostSimilarGroups
; - el propio bucle en
GetFiftyMostSimilarGroups
;
Consideraremos cada una de las tres direcciones en secuencia.
Predicción de rama
Primero, considere el método MeasureSimilarity
.
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i]) result++; } return result; }
En el punto de referencia anterior, hubo una gran variación en el tiempo de ejecución entre la prueba aleatoria y cualquiera de las secuenciales. Los grupos de etiquetas para pruebas secuenciales fueron creados por el siguiente principio:
- el número requerido de grupos se dividió en paquetes. Número de paquetes: el número máximo de etiquetas en el grupo;
- para cada grupo en el paquete i-th, se establecieron las primeras etiquetas i;
Resulta que cada grupo de etiquetas en estas pruebas consta de dos partes consecutivas de etiquetas expuestas y no expuestas. MeasureSimilarity
tiene todos los requisitos previos para que la predicción de la rama del procesador tenga un efecto significativo en las condiciones actuales. Para verificar esto, simplemente escriba un punto de referencia que compare el tiempo de ejecución de MeasureSimilarity para datos aleatorios y secuenciales:
int GetSimilaritySum(TagsGroup[] tagsGroups) { int result = 0; foreach (TagsGroup tagsGroup in tagsGroups) { result += TagsGroup.MeasureSimilarity(tagsGroup, etalon); } return result; } [Benchmark] public int Sorted() => GetSimilaritySum(sortedGroups); [Benchmark] public int Unsorted() => GetSimilaritySum(unsortedGroups);
Se probaron un millón de grupos de etiquetas, pero en Sorted
en cada grupo al principio había varias etiquetas expuestas, y luego no expuestas, y en Unsorted
la misma cantidad de etiquetas expuestas se dispersaron al azar en todo el grupo.
La diferencia de 5 segundos es impresionante, y hay que hacer algo. Para deshacerse de la influencia de la predicción de ramas y, en general, acelerar el método, debe deshacerse de las ramas mismas. Solo hay una rama en MeasureSimilarity
: verificar que las etiquetas correspondientes estén configuradas en dos grupos. Vamos a estimar en qué casos la condición será verdadera, para esto haremos una tabla de la verdad de la condición:
La tabla de verdad coincide completamente con el lógico "Y", es decir. el resultado es verdadero si y solo si ambas etiquetas son verdaderas, entonces la condición puede reducirse a: if (a.InnerTags[i] && b.InnerTags[i])
. Pero de esta manera la condición aún permanece. En el siguiente paso, nos aseguraremos de que siempre se realice la suma al resultado, para esto reescribiremos el cuerpo del bucle de esta manera:
int t = a.InnerTags[i] && b.InnerTags[i] ? 1 : 0; result += t;
Todavía no eliminamos la condición e incluso hicimos el método más lento. Pero ahora es obvio que si el tipo de InnerTags
cambia de bool a byte (1 para verdadero y 0 para falso), puede deshacerse de la condición en el operador ternario. Entonces la clase TagsGroup
se verá así:
EtiquetasGrupo public class TagsGroup { public const int TagsGroupLength = 4096; byte[] InnerTags { get; } public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { int t = a.InnerTags[i] & b.InnerTags[i]; result += t; } return result; } public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } InnerTags = new byte[TagsGroupLength]; for (int i = 0; i < TagsGroupLength; i++) { InnerTags[i] = (byte) (innerTags[i] ? 1 : 0); } } }
Estos son los resultados de referencia para el MeasureSimilarity
actualizado:
pero para el bechmark principal actualizado:
En mi opinión, ya era genial. Para aquellos que estaban convencidos de que toda la aceleración ocurrió solo porque el tipo booleano fue reemplazado por un byte, lancé un punto de referencia para dicho cuerpo de bucle:
int t = a.InnerTags[i] & b.InnerTags[i]; if (t == 1) result += t;
y estos son los resultados:
Embalaje de datos
Cada grupo tiene muchas etiquetas, y su número no se puede reducir de ninguna manera. Además, debe comparar las etiquetas con el mismo índice, y no puede dar una respuesta final sin verificar todas las etiquetas. Entonces, en cualquier caso, tendremos que iterar sobre todo el grupo de etiquetas. Sería genial poder paralelizar esta tarea de alguna manera, de modo que sea posible procesar varias etiquetas en una operación condicional. Puede hacerlo a través de la paralelización real, o puede hacerlo a través de un paquete especial de datos, que usaremos. Cada etiqueta ahora representa 1 o 0. En el result
resultado de la operación "Y" simplemente se acumula. Pero la misma operación lógica se puede aplicar no solo a números de un solo bit. C # le permite hacer esto sin problemas hasta números de 64 bits (puede hacer más a través de BitArray
, pero eso no es todo). Si representamos dos grupos de etiquetas como un conjunto de números de 64 bits con el conjunto de bits correspondiente, entonces será posible llevar a cabo una operación "Y" en cada grupo de números de 64 bits. No está claro qué hacer con el resultado. Veamos nuevamente el cuerpo del bucle:
int t = a.InnerTags[i] & b.InnerTags[i]; result += t;
el resultado aumenta en 1 cada vez que t == 1 y no cambia cuando t == 0. Como resultado, el resultado será igual a cuántas veces el resultado de a.InnerTags[i] & b.InnerTags[i]
fue uno. En consecuencia, sería posible guardar todos los resultados de a.InnerTags[i] & b.InnerTags[i]
en alguna matriz, y en el resultado escribir solo el número de unidades en esta matriz. Al realizar la operación AND en más de números de n bits, se obtendrá un resultado de n bits y será suficiente solo para saber cuántos bits se establecen entre n. El número de bits establecido en el número no cambia, lo que significa que puede contar estos números. No tiene sentido contar 64 bits, ya que No encontraremos tanta RAM. Para 32 bits, ya puedes encontrar espacio en las computadoras modernas, pero esto sigue siendo mucho. La memoria de menos de 16 bits no es difícil de encontrar, pero el cálculo será relativamente largo. Como compromiso, calculemos para números de 8 bits:
GenerateCountOfSettedBits static readonly byte[] CountOfSettedBits = GenerateCountOfSettedBits(); static byte[] GenerateCountOfSettedBits() {
ahora el constructor de TagsGroup se ve así:
const int BucketSize = 8; public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } int index = 0;
Y MeasureSimilarity
comenzó a verse así:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { int t = a.InnerTags[i] & b.InnerTags[i]; result += CountOfSettedBits[t]; } return result; }
Puede ejecutar un punto de referencia grande y asegurarse de que todo esté mejor:
¿Es posible hacer que el método MeasureSimilarity
aún más rápido? Por supuesto! Para hacer esto, es suficiente darse cuenta del hecho de que los registros de propósito general ahora son en su mayoría de 64 bits, y manejamos datos de ocho bits en ellos. Para hacer esto, aumente el tamaño del paquete en el que se empaquetan las etiquetas originales, aumente a 64 bits y vuelva a escribir los métodos necesarios:
const int BucketSize = 64; ulong[] InnerTags { get; } public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; for (int j = 0; j < BucketSize / 8; j++) { result += CountOfSettedBits[t & 255]; t >>= 8; } } return result; }
y resulta que:
Luego puede expandir el bucle interno:
Medida Similitud public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; } return result; }
¿Es aún más rápido? Si! Si usa las innovaciones de .NET Core 3.0. Aunque esta versión todavía está en la vista previa, desde el principio hay una implementación de algunos intrínsecos. La Guía intrínseca de Intel tiene el _mm_popcnt_u64
intrínseco. Que, como se describe: " Cuente el número de bits establecido en 1 en un entero de 64 bits sin signo a, y devuelva ese recuento en dst ". ¡Esto es exactamente lo que estamos tratando de lograr! En .NET Core 3.0 Preview 5, este intrínseco se implementa en System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount
(Como se señaló correctamente en los comentarios de a-tk , debe verificar que el procesador los admita antes de usar intrínsecos. En este caso, verifique la condición del System.Runtime.Intrinsics.X86.Popcnt.X64.IsSupported
). Al MeasureSimilarity
, el código del método MeasureSimilarity
será así:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; result += (int) System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount(t); } return result; }
y tiempo de ejecución:
Impresionante
No conozco las formas que pueden acelerar significativamente MeasureSimilarity
y, al mismo tiempo, no estropear en gran medida la legibilidad. Creo que puedes terminar este método.
Estructuras de datos
Ahora trataremos el cuerpo del bucle en el método GetFiftyMostSimilarGroups
:
GetFiftyMostSimilarGroups public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) { const int resultLength = 50; List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(50); for (int groupIndex = 0; groupIndex < Groups.Length; groupIndex++) { TagsGroup tagsGroup = Groups[groupIndex]; int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup); TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue); if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { continue; } int index = ~list.BinarySearch(newInfo); list.Insert(index, newInfo); if (list.Count > resultLength) { list.RemoveAt(resultLength); } } TagsGroup[] result = new TagsGroup[resultLength]; for (int i = 0; i < resultLength; i++) { result[i] = Groups[list[i].Index]; } return result; }
Permítanme recordar brevemente lo que está sucediendo aquí:
- dentro de la lista, se almacena una lista ordenada de los cincuenta grupos de etiquetas más adecuados, de hecho de menor a mayor, si compara
TagsSimilarityInfo
; - inserte el nuevo grupo en cuestión en la lista mientras conserva la clasificación;
- si hay más de cincuenta elementos en la lista, elimine el grupo menos similar (su objeto de información será el más grande y estará al final de la
list
);
Es decir Resulta que necesitamos encontrar rápidamente el elemento más grande de la colección, poder insertarlo y eliminarlo rápidamente. Para resolver tales problemas, hay estructuras de datos especiales. Lo primero que viene a la mente es un montón . Su inserción se realiza en O (log N), obteniendo el máximo en O (1), eliminando un elemento en O (log N). El único problema es que el montón no puede ser iterado por los elementos crecientes sin modificarlo. No hay un montón binario en BCL, así que lo escribí yo mismo:
Binaryheap public class BinaryHeap<T>:IEnumerable<T> where T : IComparable<T> { readonly List<T> innerList; public BinaryHeap(int capacity) { innerList = new List<T>(capacity); } public int Count => innerList.Count; public T Max => innerList[0]; public void Add(T value) { innerList.Add(value); int i = Count - 1; int parent = (i - 1) >> 1; while (i > 0 && innerList[parent].CompareTo(innerList[i]) == -1) { Swap(i, parent); i = parent; parent = (i - 1) >> 1; } } void Swap(int a, int b) { T temp = innerList[a]; innerList[a] = innerList[b]; innerList[b] = temp; } void Heapify(int i) { for (;;) { int leftChild = (i << 1) | 1; int rightChild = (i + 1) << 1; int largestChild = i; if (leftChild < Count && innerList[leftChild].CompareTo(innerList[largestChild]) == 1) { largestChild = leftChild; } if (rightChild < Count && innerList[rightChild].CompareTo(innerList[largestChild]) == 1) { largestChild = rightChild; } if (largestChild == i) { break; } Swap(i, largestChild); i = largestChild; } } public void RemoveMax() { innerList[0] = innerList[Count - 1]; innerList.RemoveAt(Count - 1); Heapify(0); } public IEnumerator<T> GetEnumerator() { return innerList.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable) innerList).GetEnumerator(); } }
La implementación correspondiente del método GetFiftyMostSimilarGroups
se puede encontrar en el código fuente del artículo (enlace a continuación).
Además del montón, puede aparecer un árbol de búsqueda binario . Un árbol de búsqueda binario equilibrado puede proporcionar inserción para O (log N), obteniendo un máximo para O (log N), eliminando un elemento para O (log N). La ventaja de dicha estructura es que puede iterarse en orden ascendente y, además, el árbol de búsqueda rojo-negro en BCL se implementa dentro de SortedSet (en un marco grande, obtener un máximo es mucho más lento que en .netcore 3.0 y asigna memoria). La implementación de GetFiftyMostSimilarGroups
para SortedSet se puede encontrar en el código fuente del artículo.
Resultados de referencia para las tres implementaciones de GetFiftyMostSimilarGroups
:
La implementación original con una hoja gana casi en todas partes en el tiempo, y ciertamente en todas partes en la memoria. Esto sucede debido al hecho de que para un algoritmo con una hoja, la inserción se realiza en O (log N) para la búsqueda, y casi O (1) para la inserción, porque copiar un número tan pequeño de elementos ocurre muy rápidamente, obteniendo un máximo para O (1), eliminando un elemento también para O (1), porque en .net, la eliminación del último elemento de la hoja se reemplaza escribiendo en el último elemento un valor vacío (en .net core no se escribe nada en las estructuras). Si fuera necesario dar no 50, pero digamos 1000 de los grupos más similares, lo más probable es que un algoritmo con una hoja no funcione. De hecho, todo esto es un pequeño razonamiento especulativo, porque Todavía puede ajustar cada uno de los algoritmos.
Multithreading
Ahora queda por tratar de mejorar el bucle en GetFiftyMostSimilarGroups
. Solo me viene a la mente el multihilo. La idea es dividir la lista completa de grupos en varios paquetes. En cada paquete, encuentre los 50 grupos de etiquetas más similares, y luego entre ellos encuentre los 50 grupos finales más similares.
La versión multiproceso de GetFiftyMostSimilarGroups
ve así:
GetFiftyMostSimilarGroupsMultiThread public TagsGroup[] GetFiftyMostSimilarGroupsMultiThread(TagsGroup value) { const int resultLength = 50;
GetFiftyMostSimilarGroupsMultiThreadCore
GetFiftyMostSimilarGroups
:
GetFiftyMostSimilarGroupsMultiThreadCore List<TagsSimilarityInfo> GetFiftyMostSimilarGroupsMultiThreadCore(TagsGroup value, int leftIndex, int rightIndex) { const int resultLength = 50; List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(resultLength); for (int groupIndex = leftIndex; groupIndex < rightIndex; groupIndex++) { TagsGroup tagsGroup = Groups[groupIndex]; int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup); TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue); if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { continue; } int index = ~list.BinarySearch(newInfo); list.Insert(index, newInfo); if (list.Count > resultLength) { list.RemoveAt(resultLength); } } return list; }
MergeTaskResults
. - taskResults . , , . , threadsCount
, : , , , :
MergeTaskResults TagsGroup[] MergeTaskResults(int resultLength, int threadsCount, List<TagsSimilarityInfo>[] taskResults) { TagsGroup[] result = new TagsGroup[resultLength]; int[] indices = new int[threadsCount]; for (int i = 0; i < resultLength; i++) { int minIndex = 0; TagsSimilarityInfo currentBest = taskResults[minIndex][indices[minIndex]]; for (int j = 0; j < threadsCount; j++) { var current = taskResults[j][indices[j]]; if (current.CompareTo(currentBest) == -1) { minIndex = j; currentBest = taskResults[minIndex][indices[minIndex]]; } } int groupIndex = currentBest.Index; result[i] = Groups[groupIndex]; indices[minIndex]++; } return result; }
indices
taskResults
;minIndex
— taskResults
, ;currentBest
— - ;current
— - ;
:
. . , , 4 50. , , .