我们每天实时细分6亿用户

每天,用户都会进行数百万次在线活动。 FACETz DMP项目需要对数据进行结构化和细分,以识别用户的偏好。 在本文中,我们将讨论该团队如何细分6亿用户,每天处理50亿个事件以及如何使用Kafka和HBase进行统计。



该材料基于Directual的大数据专家Artyom Marinov在SmartData 2017大会上的报告的笔录。

我叫Artyom Marinov,我想谈一谈我在Data Centric Alliance工作时如何重新设计FACETz DMP项目的体系结构。 我们这样做的原因,导致的原因,前进的方向以及遇到的问题。

DMP(数据管理平台)是用于收集,处理和聚合用户数据的平台。 数据有很多不同之处。 该平台拥有约6亿用户。 这些是数以百万计的Cookie,它们会在Internet上发出各种事件。 一般而言,平均一天看起来是这样的:我们每天看到约55亿个事件,它们每天都以某种方式散布,在高峰时,每秒达到约10万个事件。 事件是各种用户信号。 例如,访问一个站点:我们看到用户来自哪个浏览器,它的用户代理以及我们可以提取的所有内容。 有时我们会看到他来到该网站的方式和搜索查询。 它也可以是来自离线世界的各种数据,例如,使用折扣券支付的费用等。

我们需要保存此数据,并将用户标记为所谓的细分受众群。 例如,细分受众群可能是“爱猫”并正在寻找“汽车服务”的“女人”,而她“拥有的车龄超过三年”。

为什么要细分用户? 为此,有许多应用程序,例如广告。 各种广告网络可以优化广告投放算法。 如果您要宣传汽车服务,则可以设置广告系列,使只有拥有旧车的人才能看到信息,而新车的所有者除外。 您可以动态更改网站的内容,可以使用数据进行评分-应用程序很多。

数据是从许多完全不同的地方获得的。 可以是直接像素设置-这是如果客户要分析其受众,他会将像素放在网站上,这是从我们的服务器下载的不可见图片。 最重要的是,我们可以看到用户对该网站的访问:您可以保存它,开始分析和了解用户的画像,所有这些信息对我们的客户都是可用的。


可以从看到大量数据并希望以各种方式货币化的各种合作伙伴那里获取数据。 合作伙伴既可以实时提供数据,也可以文件形式定期上传。

关键要求:

  • 横向可扩展性;
  • 观众人数估算;
  • 方便监测和发展;
  • 对事件的良好反应率。

系统的关键要求之一是水平可伸缩性。 有这么一个时刻,当您开发门户网站或在线商店时,您可以估计用户数量(它将如何增长,如何变化)并大致了解需要多少资源以及商店将如何随着时间而发展和发展。

当开发类似于DMP的平台时,您需要做好以下准备:任何大型站点(有条件的Amazon)都可以在其中放置像素,并且您必须处理整个站点的流量,而您不应该掉队,并且使用指标系统不应因此而改变。

能够理解特定受众的数量也很重要,这样潜在的广告客户或其他人才能制定出媒体计划。 例如,有人来找您,要求您找出新西伯利亚有多少名孕妇正在寻找抵押贷款,以评估是否有针对性地将其作为目标。

从开发的角度来看,您需要能够冷静地监视系统中发生的一切,调试部分实际流量,等等。

对系统最重要的要求之一是对事件的良好反应速度。 系统响应事件的速度越快越好,这是显而易见的。 如果您正在寻找剧院门票,那么在一天,两天甚至一个小时后看到某种折扣优惠,这可能是无关紧要的,因为您已经可以购买门票或参加演出了。 当您正在寻找演习时,即在寻找,寻找,购买,悬挂架子,几天后轰炸就开始了:“买个演习!”。

和以前一样


整个文章是关于回收体系结构的。 我想告诉您我们的出发点是什么,更改之前一切工作如何。

我们拥有的所有数据,无论是直接数据流还是日志,都存储在HDFS(分布式文件存储)上。 然后有一个定期启动的过程,从HDFS中提取了所有未处理的文件,并将它们转换为HBase中的数据充实请求(“ PUT请求”)。



我们如何在HBase中存储数据


这是一个柱状时间序列数据库。 她具有行键的概念-这是您用来存储数据的键。 我们使用用户ID作为密钥,即用户ID,这是我们在初次见到该用户时生成的。 在每个键内部,数据分为列系列-实体,您可以在这些级别上管理数据的元信息。 例如,您可以为列族“数据”存储一千个版本的记录,并存储两个月,对于列族“原始”(一年),可以选择存储一年。


在列族中,有许多列限定符(以下简称列)。 我们使用各种用户属性作为列。 可能是他去过的URL,IP地址,搜索查询。 最重要的是,每列中存储了大量信息。 在列URL内可能表明用户访问了smartdataconf.ru,然后访问了其他一些站点。 时间戳记用作版本-您会看到有序的用户访问历史记录。 在我们的案例中,我们可以确定用户使用关键字“会议”访问了smartdataconf网站,因为他们具有相同的时间戳。

使用HBase


有几种使用HBase的选项。 它可以是PUT请求(数据更改请求),GET请求(“将用户Vasya上的所有数据给我”等等)。 您可以运行SCAN请求-对HBase中的所有数据进行多线程顺序扫描。 我们之前用它来标记受众群体。

有一个名为Analytics Engine的任务,它每天运行一次,并在多个线程中扫描HBase。 对于每个用户,她从HBase提取了整个故事,并通过一组分析脚本进行了运行。


什么是分析脚本? 这是某种黑匣子(java类),它接收所有用户数据作为输入,并提供一组它认为适合作为输出的段。 我们将所有内容提供给所看到的脚本-IP,访问,UserAgent等,然后在输出中脚本给出:“这是一个女人,爱猫,不喜欢狗”。

将这些数据提供给合作伙伴,并考虑统计数据。 对于我们而言,重要的是要了解一般有多少个女人,有多少男人,有多少人爱猫,有多少人有或没有汽车,等等。

我们将统计数据存储在MongoDB中,并通过每天增加一个特定的段计数器来进行写入。 我们绘制了每天每个细分市场的交易量图表。

这个系统在当时很不错。 它可以水平缩放,增长,可以估计观众数量,但是它有很多缺点。

并非总是能够了解系统中正在发生的事情,并查看日志。 当我们在上一个托管人那里时,任务常常由于各种原因而落空。 Hadoop集群包含20多个服务器,每天一次,其中一台服务器稳定崩溃。 这导致以下事实:任务可能会部分失败并且无法计算数据。 必须有时间重新启动它,并且鉴于它已经工作了几个小时,因此存在一些细微差别。

现有体系结构无法满足的最基本的事情是对事件的响应时间太长。 关于这个主题甚至还有一个故事。 有一家公司向该地区的居民发放小额贷款,我们与他们合作。 他们的客户来到现场,填写小额贷款申请,公司需要在15分钟内给出答案:他们是否准备好发放贷款。 如果您准备好了,他们会立即将钱转入卡中。

一切都很好。 客户决定检查它的一般情况:他们拿着一台单独的笔记本电脑,安装了一个干净的系统,访问了Internet上的许多页面并转到了他们的站点。 他们看到有一个请求,作为回应,我们说还没有数据。 客户问:“为什么没有数据?”

我们将解释:用户采取行动之前存在一定的滞后。 数据被发送到HBase,进行处理,然后客户端才接收结果。 看来,如果用户没有看到广告-一切都井井有条,则不会发生任何不良情况。 但是在这种情况下,由于滞后,可能无法为用户提供贷款。

这不是一个孤立的情况,有必要切换到实时系统。 我们想从她那里得到什么?


我们希望在看到数据后立即将其写入HBase。 我们看到了一次访问,丰富了我们所知道的一切,并将其发送到Storage。 存储中的数据更改后,您需要立即运行我们拥有的整套分析脚本。 我们想要监视和开发的便利,编写新脚本,将其调试为实际流量的能力。 我们想了解系统当前正在忙什么。

我们首先要解决的第二个问题是:在HBase中更改有关用户的数据后,立即对用户进行细分。 最初,我们的工作节点(在其上启动了map-reduce任务)位于与HBase相同的位置。 在许多情况下,这非常好-计算在数据旁边执行,任务运行非常快,很少流量通过网络。 显然,该任务会消耗一些资源,因为它运行复杂的分析脚本。

当我们实时工作时,HBase的负载性质发生了变化。 我们转到随机读数,而不是顺序读数。 预期HBase的负载很重要-我们不能允许某人在Hadoop群集上运行任务并破坏HBase的性能。

我们要做的第一件事是将HBase移到单独的服务器上。 还调整了BlockCache和BloomFilter。 然后,我们在如何在HBase中存储数据方面做得很好。 他们几乎重新设计了我刚开始谈论的系统,并收获了数据本身。


显而易见:我们将IP存储为字符串,并且数字变长。 一些数据被分类,进行词汇处理,等等。 最重要的是,因此,我们能够摇晃HBase大约两次-从10 TB到5 TB。 HBase具有类似于常规数据库中的触发器的机制。 这是一种协处理器机制。 我们编写了一个协处理器,当用户更改为HBase时,该协处理器将用户ID发送给Kafka。

用户标识在Kafka中。 此外,还有某个服务“细分者”。 它读取用户标识符流,并在它们之前运行所有相同的脚本,并从HBase请求数据。 该过程是在10%的流量上启动的,我们研究了它的工作原理。 一切都很好。


接下来,我们开始增加负载并看到了许多问题。 我们看到的第一件事是,该服务正常工作,分段,然后脱离Kafka,连接并重新开始工作。 多项服务-他们互相帮助。 然后下一个掉下来,另一个掉下来,依此类推。 同时,用于细分的用户阵容几乎没有受到欢迎。

这是由于Kafka中心跳机制的特殊性,当时仍是0.8版。 心跳是指消费者告诉经纪人他们是否还活着,在我们的案例中,细分报告。 发生了以下情况:我们收到了相当大的数据包,将其发送进行处理。 在一段时间内,它正常工作-没有发送任何心跳信号。 经纪人认为该消费者已经死亡,因此将其关闭。

消费者最终解决了这一问题,浪费了宝贵的CPU,试图说出该数据包已经可以使用,可以使用下一个数据包,但是他被拒绝了,因为另一个数据包带走了他正在处理的数据。 我们通过使背景热跳来解决此问题,然后事实出现了一个新版本的Kafka,我们在其中修复了此问题。

然后出现了一个问题:我们的分段器应在哪种硬件上安装。 分段是一个资源密集型过程(与CPU绑定)。 重要的是,该服务不仅会占用大量CPU,而且还会加载网络。 现在流量达到5 Gbit /秒。 问题是:将服务放置在许多小型服务器或大型服务器上的何处。

那时,我们已经转移到裸机上的servers.com 。 我们与服务器方面的人员进行了交谈,他们帮助了我们,使得可以在少量昂贵的服务器上以及在许多具有强大CPU的廉价服务器上测试解决方案的工作。 我们选择了适当的选项,计算每秒处理一个事件的单位成本。 顺便说一句,他们选择了足够强大的产品,同时又推出了负担得起的戴尔R230,这一切都奏效了。

重要的是,在细分者将用户标记为细分之后,他的分析结果在特定主题细分结果中回落到Kafka。

此外,我们可以由不会互相干扰的不同消费者独立连接到此数据。 这使我们能够将数据独立地提供给每个合作伙伴,包括一些外部合作伙伴,内部DSP,Google,统计信息。


利用统计数据,还有一个有趣的观点:早些时候,我们可以增加MongoDB中计数器的价值,即一天中某个段中有多少用户。 现在,此操作无法完成,因为我们现在在每个用户完成活动(即 一天几次。

因此,我们必须解决计算流中唯一用户数的问题。 为此,我们使用了HyperLogLog数据结构及其在Redis中的实现。 数据结构是概率性的。 这意味着您可以在此处添加用户标识符,标识符本身不会被存储,因此您可以在HyperLogLog中以极其紧凑的方式存储数百万个唯一标识符,每个密钥最多需要12 KB。



您无法自己获取标识符,但是可以找到此集合的大小。 由于数据结构是概率性的,因此存在一些错误。 例如,如果您有一个细分受众群“ likes cats”,要求某天细分该细分市场的规模,您将收到9920万,这意味着“从99百万到1亿”。

同样在HyperLogLog中,您可以获取多个集合的并集大小。 假设您有两个部分:“爱海豹”和“爱狗”。 假设前1亿,后100万,可能会问:“他们喜欢多少只动物?” 并得到答案“约1.01亿”,误差为1%。 计算一下猫和狗同时被爱多少会很有趣,但是要做到这一点是非常困难的。


一方面,您可以找出每个集合的大小,找出并集的大小,相加,相减,得到相交。 但是由于误差的大小可能大于最终交点的大小,因此最终结果的形式可能为“ -50至5万”。


我们在将数据写入Redis时如何提高性能方面做了很多工作。 最初,我们达到了每秒20万次操作。 但是,当每个用户有超过50个细分时-记录有关每个用户的信息-需要进行50次操作。 事实证明,我们的带宽非常有限,在此示例中,每秒无法写入有关超过4000个用户的信息,这比我们所需的信息少了几倍。

我们通过Lua在Redis中创建了一个单独的“存储过程”,将其加载到其中,并开始向其中传递一个字符串,其中包含一个用户的整个段列表。 里面的过程会将传递的字符串切成必要的HyperLogLog更新并保存数据,因此我们每秒达到约一百万次更新。

有点硬核:Redis是单线程的,您可以将其固定到一个处理器核心,将网卡固定到另一个处理器核心,并获得另外15%的性能,从而节省了上下文切换。 除此之外,重要的一点是您不能简单地对数据结构进行聚类,因为获得集合并集的幂的操作不会聚类

卡夫卡是一个很棒的工具


您会看到Kafka是我们系统中的主要运输工具。
它具有“主题”的本质。 这是您写入数据的地方,但本质上是队列。 在我们的例子中,有几个队列。 其中之一是有必要进行细分的用户的标识符。 第二是细分结果。


主题是一组分区。 它分为几部分。 每个分区都是硬盘驱动器上的一个文件。 当生产者写入数据时,他们将文本片段写入分区的末尾。 当您的使用者读取数据时,他们只是从这些分区中读取。

重要的是,您可以独立地连接多个使用者组,他们将使用数据而不会互相干扰。 这由消费者组的名称确定,并通过以下方式实现。


有一个偏移量,即消费组现在在每个分区上的位置。 例如,组A消耗来自partition1的第七条消息,并消耗来自partition2的第五条消息。 与A组无关的B组具有其他偏移量。



您可以水平扩展用户组,添加其他进程或服务器。这将发生分区重新分配(Kafka代理将为每个使用者分配一个要使用的分区列表),这意味着第一个使用者组将开始仅使用分区1,第二个使用者组仅使用分区2。如果某些使用者死亡(例如,心跳不来),则会发生新的重新分配。 ,每个使用者都会收到最新的分区列表以进行处理。


这很方便。首先,您可以操纵每个消费者组的偏移量。想象一下,有一个合作伙伴,您可以将您根据本主题传输的数据以及细分结果传输给该合作伙伴。他写道,由于错误,他意外丢失了最后一天的数据。而对于此客户的消费者组,您只需回滚一天,然后将整天的数据倒入该客户即可。我们还可以拥有自己的消费者组,连接到生产流量,观察发生的情况,并对真实数据进行调试。

因此,我们已经实现了在变更时就开始对用户进行细分,可以独立地连接新的使用者,编写统计信息并进行观察的功能。现在,您需要在收到数据后立即将其写入HBase。


我们如何做到的。过去曾批量加载数据。有一个Batch Loader,它处理用户活动日志文件:如果用户进行了10次访问,则批处理了10个事件,那么它将在一次操作中记录在HBase中。每个细分只有一个事件。现在我们要在存储中编写每个单独的事件。我们将大大增加写入流和读取流。每个细分的事件数量也会增加。


我们要做的第一件事是将HBase移植到SSD。通过标准方式,这不是特别地完成。这是使用HDFS完成的。您可以说HDFS上的特定目录必须在这样的磁盘组上。有一个很酷的问题,那就是当我们将HBase带到SSD上并复制它时,所有快照也都到了那里,并且我们的SSD很快就结束了。

这也得到解决,我们开始定期将快照导出到文件中,写入另一个HDFS目录,并删除所有有关快照的元信息。如果需要还原-取出保存的文件,然后导入并还原。幸运的是,此操作非常少见。

同样在SSD上,他们取出了扭曲的MemStore预先写入日志,打开了写入时缓存块选项。允许您在记录数据时立即将它们放入块缓存中。这很方便,因为在我们的案例中,如果我们记录了数据,那么很可能会立即读取它。这也带来了一些好处。

接下来,我们将所有数据源切换为将数据写入Kafka。从卡夫卡(Kafka)起,我们就在HDFS中记录了数据,以保持向后兼容性,包括使我们的分析师可以处理数据,运行MapReduce任务并分析其结果。

我们连接了一个单独的使用者组,该组将数据写入HBase。实际上,这是一个从Kafka读取并在HBase中形成PUT的包装器。


我们并行启动了两个电路,以免破坏向后兼容性,也不会降低系统性能。仅在一定流量的百分比下启动了新方案。达到10%时,一切都很酷。但是在更大的负载下,分割器无法应对分割流程。


我们收集的度量标准是“从卡夫卡读取之前,在卡夫卡中放置了多少条消息”。这是一个很好的指标。最初,我们收集的度量标准为“现在有多少原始消息”,但是并没有特别说明。您会看到:“我有一百万条原始消息,”那又如何呢?要解释这百万美元,您需要知道细分器(消费者)的运行速度,这并不总是很清楚。

使用此度量标准,您可以立即看到数据是从队列中写入的,并且可以看到要处理多少数据。我们看到我们没有时间进行细分,并且该消息在读取消息之前已处于队列中几个小时。

您可以增加容量,但是那太昂贵了。因此,我们尝试进行优化。

自定标


我们有HBase。用户正在更改,他的标识符正在Kafka中飞行。主题分为多个分区,目标分区由用户ID选择。这意味着,当您看到用户“ Vasya”时-转到分区1。当您看到“ Petya”时-转到分区2。这很方便-您可以实现在一个服务实例上看到一个使用者,而在另一个实例上看到一个使用者。另一方面。


我们开始观察发生了什么。互联网上一种典型的用户行为是转到某个网站并打开几个背景标签。第二种是转到该站点,然后单击几下即可到达登录页面。

我们查看细分队列,并看到以下内容:用户A访问了该页面。该用户发出了另外5个事件-每个事件都表示一个页面打开。我们处理用户的每个事件。但是实际上,HBase中的数据包含所有5次访问。我们第一次,第二次处理所有5次访问,依此类推-我们在浪费CPU资源。


因此,我们开始在每个细分市场上存储某个本地缓存,并以上次对该用户进行分析的日期为准。也就是说,我们对其进行了处理,并将其用户ID和时间戳写入缓存。每条kafka消息也都有一个时间戳-我们可以简单地进行比较:如果队列中的时间戳小于最后一个分段的日期-我们已经为用户分析了这些数据,您可以跳过此事件。

用户事件(红色A)可以不同,并且顺序混乱。用户可以打开多个背景选项卡,连续打开多个链接,也许站点一次拥有我们的多个合作伙伴,每个合作伙伴都会发送此数据。

我们的像素可以看到用户的访问,然后可以执行其他一些操作-我们将把他的头盔发送给我们自己。五个事件到达,我们正在处理第一个红色的A。如果事件到达,则它已经在HBase中。我们看到事件,这些事件通过一组脚本运行。我们看到以下事件,并且所有相同的事件,因为它们已被记录。我们再次运行它,然后将缓存与日期一起保存,并将其与事件的时间戳进行比较。



因此,系统获得了可伸缩性。 y轴是当用户ID出现时,我们对用户ID所做的处理的百分比。绿色-我们执行的工作启动了细分脚本。黄色-我们没有这样做,因为已经正确分割了此数据。


可以看出,晚上有资源,数据流较少,您可以每隔一秒细分一次。资源日较小,我们仅细分了20%的事件。一天快结束了-合作伙伴上传了我们之前从未见过的数据文件,因此必须“诚实”地对其进行分段。

系统本身可以适应负载增长。如果我们有一个非常大的合作伙伴,我们将处理相同的数据,但频率要少一些。在这种情况下,系统的特性将在晚上恶化,分段不会延迟2-3秒,而是延迟一分钟。早晨,添加服务器并返回所需的结果。

因此,我们在服务器上节省了大约5倍。现在我们在10台服务器上工作,因此需要50-60。

最上面的蓝色小东西是机器人。这是细分中最难的部分。他们有大量的访问,它们在熨斗上造成很大的负担。我们看到每个机器人都在单独的服务器上。我们可以在其中收集带有机器人黑名单的本地缓存。引入了一个简单的反欺诈:如果用户在一定时间内进行了太多访问,那么他出了点问题,我们会在一段时间内将其添加到黑名单中。这是一条蓝色的小条,大约5%。他们又给我们节省了30%的CPU。

因此,我们实现了在每个阶段看到的整个数据处理管道。我们会看到有关Kafka中邮件数量的指标。到了晚上,某个地方变暗了,处理时间增加到一分钟,然后释放并恢复正常。


我们可以监视我们的系统操作如何影响其吞吐量,可以看到脚本正在运行多少,有必要进行优化以及可以节省多少。我们可以看到线段的大小,线段大小的动态变化,评估它们的关联和交集。可以为大致相同的段大小执行此操作。

您想完善什么?


我们有一个包含一些计算资源的Hadoop集群。他很忙-分析师在白天工作,但晚上却几乎没有时间。通常,我们可以在集群中将分段器作为单独的过程进行容器化和运行。我们想要更准确地存储统计信息,以便更准确地计算相交的体积。我们还需要对CPU进行优化。这直接影响决策的成本。

总结:Kafka很好,但是,与其他任何技术一样,您需要了解它在内部的工作方式以及发生的情况。例如,消息优先级保证仅在分区内起作用。如果您发送到不同分区的消息,则不清楚将按照什么顺序处理它们。

真实数据非常重要。如果我们没有在实际流量上进行测试,那么很可能我们不会在机器人和用户会话方面看到问题。会在真空中产生一些东西,跑下来躺下。重要的是监视您认为需要监视的内容,而不是监视您不认为的内容。

分钟的广告。如果您喜欢SmartData会议的这份报告,请注意,SmartData 2018将在10月15日在圣彼得堡举行,这是一个专为机器学习,分析和数据处理领域的人们而设计会议。该计划将有很多有趣的事情,该网站已经有第一批发言人和报告。

Source: https://habr.com/ru/post/zh-CN421125/


All Articles