前段时间,我的同事要我帮助他解决一个问题。 我为他解决了这个问题,但除此之外,在我看来,解决这个问题可以解释几种编程算法和技术。 并且还显示了算法执行时间从25秒加速到40 ms。
问题陈述
对于个人项目,我的同事需要一种算法,以找到给定视频的五十个最相似的视频。 可以通过匹配暴露标签的数量来估计相似性。 视频匹配的标签越多,它们越相似。 由此我们可以立即得出以下结论:
- 视频下的所有标签可以合并为一组;
- 这样的群组绝对不会比视频本身多;
- 如果该视频与某个标签组中的另一视频相似,则该视频与该组中的其他视频同样相似;
事实证明,仅使用标签组就足够了。 在第一个版本中,一位同事决定将标签存储在标签表中:每个视频都有一个指向标签组ID的链接,并且组本身是一系列布尔值,它们指示是否设置了相应的标签。 在C#中,标记组如下所示:
public class TagsGroup { bool[] InnerTags { get; } }
一位同事建议,在该网站上,他的视频不超过一百万个,各种标签不超过4000个,对于一个全面的帐户,您可以取4096 = 2 ^ 12。
然后, TagsGroup
类可以这样表示:
public class TagsGroup { public const int TagsGroupLength = 4096; bool[] InnerTags { get; } public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } InnerTags = innerTags; } }
现在,您需要检查两组标签的相似性。 在当前条件下,这将变成对两组标签的InnerTags
数组的相应元素中的true的简单检查:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i]) result++; } return result; }
现在仅需计算所需标签组与每个现有组的相似度并选择五十个最相似的标签组。 我为确保样品稳定性设定了另一个条件,即 在最终样本中,将有五十个标签组的MeasureSimilarity
给出最高结果,而具有相同MeasureSimilarity
标签组的索引将相对于那些在原始现有组中MeasureSimilarity
较低索引的标签组具有较低的索引。 可以在以下位置找到更多详细信息: https : //ru.wikipedia.org/wiki/Sustainable_Sort 。
为了解决这个问题,我决定制作SimilarTagsCalculator
类SimilarTagsCalculator
,这是它的代码:
相似标签 class SimilarTagsCalculator { TagsGroup[] Groups { get; } public SimilarTagsCalculator(TagsGroup[] groups) { if (groups == null) throw new ArgumentException(nameof(groups)); Groups = groups; } public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) { const int resultLength = 50;
和TagsSimilarityInfo
结构:
标签相似信息 struct TagsSimilarityInfo : IComparable<TagsSimilarityInfo>, IComparable { public int Index { get; } public int Similarity { get; } public TagsSimilarityInfo(int index, int similarity) { Index = index; Similarity = similarity; } public bool Equals(TagsSimilarityInfo other) { return Index == other.Index && Similarity == other.Similarity; } public override bool Equals(object obj) { return obj is TagsSimilarityInfo other && Equals(other); } public override int GetHashCode() { unchecked { return (Index * 397) ^ Similarity; } } public int CompareTo(TagsSimilarityInfo other) { int similarityComparison = other.Similarity.CompareTo(Similarity); return similarityComparison != 0 ? similarityComparison : Index.CompareTo(other.Index); } public int CompareTo(object obj) { if (ReferenceEquals(null, obj)) return 1; return obj is TagsSimilarityInfo other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(TagsSimilarityInfo)}"); } }
我为此算法准备了三个基准:
- 完全随机的基准,即 组中已设置标签的数量是随机的,我们将与之比较的标签组也是随机的;
- 组中设置的标签数量在增加,我们将与设置所有标签的组进行比较。 事实证明,最后一些标签组应该是最合适的。
- 与上述相同,但公开标签的数量正在减少。 前50组标签最合适;
以下是一百万个小组的基准测试结果:
BenchmarkDotNet = v0.11.5,操作系统= Windows 10.0.17134.765(1803 / April 2018Update / Redstone4)
英特尔酷睿i7-6700 CPU 3.40GHz(Skylake),1个CPU,8个逻辑和4个物理核心
频率= 3328126 Hz,分辨率= 300.4694 ns,计时器= TSC
.NET Core SDK = 3.0.100-preview5-011568
[主机]:.NET Core 3.0.0-preview5-27626-15(CoreCLR 4.6.27622.75,CoreFX 4.700.19.22408),64位RyuJIT
执行时间的跨度很大,除了25秒是很长的时间,我的同事不同意等待那么长时间。 因此,让我们进行优化。 现在,有三个主要方面可以加快程序的速度:
MeasureSimilarity
方法;GetFiftyMostSimilarGroups
循环主体中的一种算法;GetFiftyMostSimilarGroups
的循环本身;
我们将依次考虑三个方向。
分支预测
首先,考虑MeasureSimilarity
方法。
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { if (a.InnerTags[i] && a.InnerTags[i] == b.InnerTags[i]) result++; } return result; }
在以前的基准测试中,随机测试和任何顺序测试之间的运行时差异都很大。 顺序测试的标签组是根据以下原理创建的:
- 将所需数量的组分为多个包。 数据包数量-组中标签的最大数量;
- 对于第i个数据包中的每个组,都设置了第一个i标签;
事实证明,这些测试中的每组标签都由暴露和未暴露标签的两个连续部分组成。 MeasureSimilarity
具有处理器分支预测在当前条件下具有显著作用的所有先决条件。 要检查这一点,只需编写一个基准,比较随机数据和顺序数据的MeasureSimilarity运行时:
int GetSimilaritySum(TagsGroup[] tagsGroups) { int result = 0; foreach (TagsGroup tagsGroup in tagsGroups) { result += TagsGroup.MeasureSimilarity(tagsGroup, etalon); } return result; } [Benchmark] public int Sorted() => GetSimilaritySum(sortedGroups); [Benchmark] public int Unsorted() => GetSimilaritySum(unsortedGroups);
测试了100万个标签组,但首先在每个组中进行Sorted
,然后有几个暴露的标签,然后是未暴露的标签,在未分类的中,相同数量的暴露标签被随机散布在整个组中。
5秒的间隔令人印象深刻,需要做一些事情。 为了摆脱分支预测的影响并总体上加快该方法的速度,您需要摆脱分支本身。 MeasureSimilarity
只有一个分支-检查是否将相应标签设置为两组。 让我们估计一下在哪种情况下该条件为真,为此,我们将制作一个表以显示该条件的真相:
真值表与逻辑“与”完全重合,即 当且仅当两个标签都为true时,结果才为true,则条件可以简化为: if (a.InnerTags[i] && b.InnerTags[i])
。 但是,这种情况仍然存在。 在下一步中,我们将确保始终执行对结果的加法操作,为此,我们将重写循环主体,如下所示:
int t = a.InnerTags[i] && b.InnerTags[i] ? 1 : 0; result += t;
我们仍然没有摆脱这种情况,实际上甚至使方法变慢了。 但是现在很明显的是,如果将InnerTags
的类型从bool更改为字节(1表示true,0表示false),那么您可以摆脱三元运算符中的条件。 然后, TagsGroup
类将如下所示:
标签组 public class TagsGroup { public const int TagsGroupLength = 4096; byte[] InnerTags { get; } public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength; i++) { int t = a.InnerTags[i] & b.InnerTags[i]; result += t; } return result; } public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } InnerTags = new byte[TagsGroupLength]; for (int i = 0; i < TagsGroupLength; i++) { InnerTags[i] = (byte) (innerTags[i] ? 1 : 0); } } }
以下是更新后的MeasureSimilarity
的基准测试结果:
但对于更新后的主要标记:
我认为这已经很棒。 对于那些确信所有加速仅是因为布尔类型被替换为字节才发生的那些人,我为这样的循环体启动了一个基准:
int t = a.InnerTags[i] & b.InnerTags[i]; if (t == 1) result += t;
结果如下:
数据打包
每个组都有许多标签,并且不能以任何方式减少其数量。 另外,必须比较具有相同索引的标签,并且不检查所有标签就不能给出最终答案。 因此,无论如何,我们将不得不遍历整个标签组。 能够以某种方式并行化此任务非常好,这样就可以在一个条件操作中处理多个标签。 您可以通过真正的并行化来做到这一点,也可以通过特殊的数据打包来实现,我们将使用它们。 现在,每个标签表示1或0。在结果中,仅对“ AND”运算result
结果进行累加。 但是,相同的逻辑运算不仅可以应用于单位数字。 C#允许您做到这一点而不会出现任何问题(最多64位数字)(您可以通过BitArray
进行更多操作,但事实并非如此)。 如果我们将两组标签表示为一组64位数字,并设置了相应的位,则可以对每组这样的64位数字执行“与”运算。 目前尚不清楚该如何处理结果。 让我们再次看一下循环的主体:
int t = a.InnerTags[i] & b.InnerTags[i]; result += t;
每当t == 1时结果增加1,而当t == 0时结果不变。结果将等于a.InnerTags[i] & b.InnerTags[i]
的结果等于1的次数。 因此,可以将a.InnerTags[i] & b.InnerTags[i]
所有结果保存在某个数组中,并且在结果中仅写入该数组中的单元数。 当对多于n位的数字执行AND操作时,将得到n位的结果,仅知道n中设置了多少位就足够了。 数字中设置的位数不变,这意味着您可以计算这些数字。 计算64位没有意义,因为 我们不会找到太多的RAM。 对于32位,您已经可以在现代计算机上找到空间,但这仍然很多。 16位以下的内存并不难找到,但计算会相对较长。 作为折衷方案,让我们计算8位数字:
GenerateCountOfSettedBits static readonly byte[] CountOfSettedBits = GenerateCountOfSettedBits(); static byte[] GenerateCountOfSettedBits() {
现在,TagsGroup构造函数如下所示:
const int BucketSize = 8; public TagsGroup(bool[] innerTags) { if (innerTags == null) throw new ArgumentException(nameof(innerTags)); if (innerTags.Length != TagsGroupLength) { throw new ArgumentException(nameof(innerTags)); } int index = 0;
MeasureSimilarity
开始看起来像这样:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { int t = a.InnerTags[i] & b.InnerTags[i]; result += CountOfSettedBits[t]; } return result; }
您可以运行大型基准测试,并确保一切都更好:
是否可以使MeasureSimilarity
方法更快? 当然可以! 为此,足以认识到通用寄存器现在大多数都是64位,并且我们在其中驱动了8位数据。 为此,请增加将原始标签打包到的数据包大小,将其增加到64位,然后重写必要的方法:
const int BucketSize = 64; ulong[] InnerTags { get; } public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; for (int j = 0; j < BucketSize / 8; j++) { result += CountOfSettedBits[t & 255]; t >>= 8; } } return result; }
结果是:
然后,您可以展开内部循环:
度量相似度 public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; t >>= 8; result += CountOfSettedBits[t & 255]; } return result; }
它更快吗? 是的 如果您使用.NET Core 3.0的创新功能。 尽管此版本仍在预览中,但是从一开始就存在一些内部函数的实现。 英特尔内部指南具有内部_mm_popcnt_u64
。 如前所述:“ 对无符号64位整数a中设置为1的位数进行计数,并在dst中返回该计数。 ”。 这正是我们正在努力实现的目标! 在.NET Core 3.0 Preview 5中,此内在函数在System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount
实现(如a-tk注释中正确指出的那样,在使用内在函数之前,您必须验证处理器是否支持它们。在这种情况下,请检查System.Runtime.Intrinsics.X86.Popcnt.X64.IsSupported
状况System.Runtime.Intrinsics.X86.Popcnt.X64.IsSupported
)。 使用它, MeasureSimilarity
方法的代码将如下所示:
public static int MeasureSimilarity(TagsGroup a, TagsGroup b) { int result = 0; for (int i = 0; i < TagsGroupLength / BucketSize; i++) { ulong t = a.InnerTags[i] & b.InnerTags[i]; result += (int) System.Runtime.Intrinsics.X86.Popcnt.X64.PopCount(t); } return result; }
和执行时间:
令人印象深刻。
我不知道可以显着提高MeasureSimilarity
的方式,同时又不会极大地损害可读性。 我认为您可以结束这种方法。
资料结构
现在,我们将在GetFiftyMostSimilarGroups
方法中GetFiftyMostSimilarGroups
循环的GetFiftyMostSimilarGroups
:
GetFiftyMostSimilarGroups public TagsGroup[] GetFiftyMostSimilarGroups(TagsGroup value) { const int resultLength = 50; List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(50); for (int groupIndex = 0; groupIndex < Groups.Length; groupIndex++) { TagsGroup tagsGroup = Groups[groupIndex]; int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup); TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue); if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { continue; } int index = ~list.BinarySearch(newInfo); list.Insert(index, newInfo); if (list.Count > resultLength) { list.RemoveAt(resultLength); } } TagsGroup[] result = new TagsGroup[resultLength]; for (int i = 0; i < resultLength; i++) { result[i] = Groups[list[i].Index]; } return result; }
让我简要回顾一下这里发生的事情:
- 在列表中,如果比较
TagsSimilarityInfo
,则会存储五十个最合适的标签组的排序列表,实际上是从较小到较大。 - 在保留排序的同时将有问题的新组插入列表中;
- 如果列表中有50个以上的元素,则删除最不相似的组(其信息对象将是最大的,并且将在
list
);
即 事实证明,我们需要非常快速地找到集合中最大的元素,能够快速插入和删除。 为了解决这些问题,有特殊的数据结构。 首先想到的是一堆 。 她的插入在O(log N)中执行,在O(1)中获得最大值,在O(log N)中删除一个元素。 唯一的问题是增加的元素不能修改堆而不能对其进行修改。 BCL中没有二进制堆,所以我自己写了它:
二进制堆 public class BinaryHeap<T>:IEnumerable<T> where T : IComparable<T> { readonly List<T> innerList; public BinaryHeap(int capacity) { innerList = new List<T>(capacity); } public int Count => innerList.Count; public T Max => innerList[0]; public void Add(T value) { innerList.Add(value); int i = Count - 1; int parent = (i - 1) >> 1; while (i > 0 && innerList[parent].CompareTo(innerList[i]) == -1) { Swap(i, parent); i = parent; parent = (i - 1) >> 1; } } void Swap(int a, int b) { T temp = innerList[a]; innerList[a] = innerList[b]; innerList[b] = temp; } void Heapify(int i) { for (;;) { int leftChild = (i << 1) | 1; int rightChild = (i + 1) << 1; int largestChild = i; if (leftChild < Count && innerList[leftChild].CompareTo(innerList[largestChild]) == 1) { largestChild = leftChild; } if (rightChild < Count && innerList[rightChild].CompareTo(innerList[largestChild]) == 1) { largestChild = rightChild; } if (largestChild == i) { break; } Swap(i, largestChild); i = largestChild; } } public void RemoveMax() { innerList[0] = innerList[Count - 1]; innerList.RemoveAt(Count - 1); Heapify(0); } public IEnumerator<T> GetEnumerator() { return innerList.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable) innerList).GetEnumerator(); } }
GetFiftyMostSimilarGroups
方法的相应实现可以在本文的源代码中找到(下面的链接)。
除了堆之外,可能还会出现二叉搜索树 。 平衡的二叉搜索树可以为O(log N)提供插入,为O(log N)提供最大值,为O(log N)除去元素。 这种结构的优点是可以按升序进行迭代,此外,BCL中的红黑搜索树是在SortedSet内部实现的(在大型框架中,获得最大值比.netcore 3.0中要慢得多,并且要分配内存)。 可以在本文的源代码中找到SortedSet的GetFiftyMostSimilarGroups
的实现。
所有三个GetFiftyMostSimilarGroups
实现的基准结果:
带有叶子的原始实现几乎在任何时候都赢了,当然在内存中也赢了。 发生这种情况的原因是,对于具有工作表的算法,对于搜索,插入操作以O(log N)执行,对于插入操作,几乎以O(1)执行操作,因为 复制这么少的元素很快就发生了,获得O(1)的最大值,同时也删除O(1)的元素,因为 在.net中,从工作表中删除最后一个元素的方法是,将一个空值写入最后一个元素(在.net核心中,没有任何内容写入结构)。 如果要求不给出50个,而是说1000个最相似的组,那么很可能,带有工作表的算法将不起作用。 实际上,所有这些都是一些推测性的推理,因为 您仍然可以调整每种算法。
多线程
现在剩下的工作就是尝试改善GetFiftyMostSimilarGroups
的循环本身。 只想到多线程。 想法是将整个组列表分成几个包。 在每个程序包中,找到50个最相似的标签组,然后在其中找到最后50个最相似的标签组。
GetFiftyMostSimilarGroups
的多线程版本如下所示:
GetFiftyMostSimilarGroupsMultiThread public TagsGroup[] GetFiftyMostSimilarGroupsMultiThread(TagsGroup value) { const int resultLength = 50;
GetFiftyMostSimilarGroupsMultiThreadCore
GetFiftyMostSimilarGroups
:
GetFiftyMostSimilarGroupsMultiThreadCore List<TagsSimilarityInfo> GetFiftyMostSimilarGroupsMultiThreadCore(TagsGroup value, int leftIndex, int rightIndex) { const int resultLength = 50; List<TagsSimilarityInfo> list = new List<TagsSimilarityInfo>(resultLength); for (int groupIndex = leftIndex; groupIndex < rightIndex; groupIndex++) { TagsGroup tagsGroup = Groups[groupIndex]; int similarityValue = TagsGroup.MeasureSimilarity(value, tagsGroup); TagsSimilarityInfo newInfo = new TagsSimilarityInfo(groupIndex, similarityValue); if (list.Count == resultLength && list[resultLength - 1].CompareTo(newInfo) == -1) { continue; } int index = ~list.BinarySearch(newInfo); list.Insert(index, newInfo); if (list.Count > resultLength) { list.RemoveAt(resultLength); } } return list; }
MergeTaskResults
. - taskResults . , , . , threadsCount
, : , , , :
MergeTaskResults TagsGroup[] MergeTaskResults(int resultLength, int threadsCount, List<TagsSimilarityInfo>[] taskResults) { TagsGroup[] result = new TagsGroup[resultLength]; int[] indices = new int[threadsCount]; for (int i = 0; i < resultLength; i++) { int minIndex = 0; TagsSimilarityInfo currentBest = taskResults[minIndex][indices[minIndex]]; for (int j = 0; j < threadsCount; j++) { var current = taskResults[j][indices[j]]; if (current.CompareTo(currentBest) == -1) { minIndex = j; currentBest = taskResults[minIndex][indices[minIndex]]; } } int groupIndex = currentBest.Index; result[i] = Groups[groupIndex]; indices[minIndex]++; } return result; }
indices
taskResults
;minIndex
— taskResults
, ;currentBest
— - ;current
— - ;
:
. . , , 4 50. , , .