来源哈Ha! 我叫Maxim Pchelin,我在MyGames(Mail.ru Group的游戏部门)领导BI-DWH的开发。 在本文中,我将讨论我们如何以及为什么构建面向客户端的DataLake存储。
本文包括三个部分。 首先,我将解释为什么我们决定实现DataLake。 在第二部分中,我将描述我们使用什么技术和解决方案,以便存储可以工作并充满数据。 在第三部分中,我将描述我们为提高服务质量所做的工作。
是什么把我们带到了DataLake
MyGames的我们在BI-DWH部门工作,提供两类服务:数据分析师资料库和业务用户(经理,营销人员,游戏开发人员等)的常规报告服务。
为什么要这样的非标存储?
通常,BI-DWH并不意味着实现DataLake存储;这不能称为典型解决方案。 怎样建立这样的服务?
通常,公司有一个项目-在我们的例子中,这是一个游戏。 该项目具有一个日志记录系统,该系统通常将数据写入数据库。 在此基础之上,将为聚合,指标和其他实体创建店面,以用于将来的分析。 常规报告基于店面,使用任何合适的BI工具以及Ad-Hoc分析系统,从简单的SQL查询和Excel表开始,以Jupyter Notebook for DS和ML结尾。 整个系统由一个开发团队支持。
假设另一家公司诞生于一家公司。 拥有另一个开发团队和基础架构很有吸引力,但价格昂贵。 因此,该项目需要“挂钩”。 这可以通过不同的方式来完成:在数据库级别,店面级别或至少在显示级别-解决了该问题。
如果公司有第三个项目? “共享”可能已经很糟糕了:资源分配或访问权限可能存在问题。 例如,一个项目是由外部团队完成的,该团队不需要了解前两个项目。 情况变得越来越危险。
现在想象没有三个项目,但是还有更多。 碰巧这就是我们的情况。
MyGames是Mail.ru集团最大的部门之一,我们的投资组合中有150个项目。 而且,它们都非常不同:它们自己的开发和购买是为了在俄罗斯运营。 它们可在各种平台上工作:PC,Xbox,Playstation,iOS和Android。 这些项目是在全球十个办事处与数百名决策者一起开发的。

对于企业而言,这很棒,但是使BI-DWH团队的工作变得复杂。
在我们的游戏中,记录了很多玩家的动作:当他进入游戏时,他在哪里和如何获得关卡,与谁打架以及如何打败他,他以什么货币购买了货币。 我们需要为每个游戏收集所有这些数据。
我们需要这样做,以便企业可以收到有关项目问题的答案。 行动启动后上周发生了什么? 我们对下个月的收入或游戏服务器容量使用情况有何预测? 如何做才能影响这些预测?
重要的是,MyGames不得将开发范例强加于项目。 每个游戏工作室都会记录数据,因为它认为效率更高。 有些项目在客户端生成日志,有些在服务器端生成日志。 一些项目使用RDBMS来收集它们,而其他项目则使用完全不同的工具:Kafka,Elasticsearch,Hadoop,Tarantool或Redis。 然后,我们将这些数据源用于将其上载到存储库。
您想要我们的BI-DWH有什么?
首先,他们希望从BI-DWH部门接收我们所有游戏的数据,以解决日常运营任务和战略任务。 在关卡结束时从给一个可怕的怪物献出多少生命开始,到如何在公司内部适当分配资源结束:哪些项目应该给更多的开发人员或谁应该分配营销预算。
我们也期望可靠性。 我们在一家大公司工作,不能遵循“昨天我们工作了,但是今天系统已经到位,只有我们提出一些建议,才能在一周内实现。”
他们想从我们这里节省钱。 我们很乐意通过购买钢铁或雇用人员来解决所有问题。 但是我们是一个商业组织,负担不起。 我们试图使公司盈利。
重要的是,他们希望我们关注客户。 在这种情况下,客户是我们的消费者,客户:经理,分析师等。我们必须适应我们的游戏并以一种方便客户与我们合作的方式进行工作。 例如,在某些情况下,当我们在亚洲市场购买用于运营的项目时,连同游戏一起,我们可以获得以中文命名的基地。 以及这些基础的中文文档。 我们可以寻找具有中文知识的ETL开发人员,也可以拒绝下载游戏中的数据,但相反,我和团队将自己锁定在会议室,花时间开始游戏。 进入和退出游戏,购买,射击,死亡。 我们看一下该表或该表中出现的内容和时间。 然后,我们编写文档,并在此基础上构建ETL。
在这种情况下,感觉到边缘很重要。 当您需要帮助附近有500,000 DAU的项目时,挖掘DAU为50的游戏的独特日志记录是不可接受的。 因此,当然,我们可以花很多精力来构建定制解决方案,但前提是企业确实需要它。
但是,一旦开发人员(尤其是初学者)听说他们将必须以这种方式进行适应,他们就会渴望从不这样做。 任何开发人员都希望创建理想的体系结构,不要改变它,而要在Habr上写有关它的文章。
但是,如果我们停止适应游戏,会发生什么? 假设我们开始要求他们将数据发送到单个输入API? 结果将是一个-每个人都会开始分散。
- 一些项目将开始以偏爱和诗人的态度削减他们的BI-DWH解决方案。 这将导致资源重复和在系统之间交换数据时遇到困难。
- 其他项目不会取消BI-DWH的创建,但是它们也不想适应我们的项目。 还有其他人将停止使用数据,这甚至更糟。
- 很好,最重要的是,管理层将没有有关项目中正在发生的事情的最新系统信息。
我们可以以简单的方式实现存储吗?
150个项目很多。 立即实施所有解决方案的时间太长。 企业将不会等待一年的时间才能看到第一个结果。 因此,我们采用了3个带来最大收益的项目,并为其实施了第一个原型。 我们希望从中收集关键数据,并使用最受欢迎的指标创建基本的仪表板-DAU,MAU,收入,注册,保留以及一些经济和预测。
为此,我们不能使用项目本身的游戏基础。 首先,由于需要汇总来自多个数据库的数据,这将使跨设计分析更加困难。 其次,游戏本身在这些数据库上运行,这很重要,这样母版和副本不会过载。 最后,所有游戏在某个时候会删除数据库中不需要的所有数据历史记录,这对于分析来说是不可接受的。
因此,唯一的选择是在一个地方收集分析所需的一切。 此时,任何关系数据库或纯文本存储库都适合我们。 我们将拧紧BI并构建仪表板。 这些解决方案的组合有很多选择:

但是我们知道以后我们需要覆盖所有其他150场比赛。 也许某些集群关系数据库可以处理生成的数据量。 但是源不仅位于完全不同的系统中,而且具有非常不同的数据结构。 我们遇到了关系结构,Data Vault等。 如果没有复杂而费力的技巧,将所有这些都放在一个数据库中是行不通的。
所有这些使我们了解到我们需要构建一个DataLake。
DataLake实施
首先,DataLake存储适合我们的条件,因为它允许我们存储非结构化数据。 DataRake可以成为所有各种源的单一入口点,从RDBMS的表开始,以JSON结束,而JSON是我们从Kafka或Mongo提供的。 结果,DataLake可以成为跨设计分析的基础,该跨设计分析是基于针对各种使用者(SQL,Python,R,Spark等)的接口实现的。
切换到Hadoop
对于DataLake,我们选择了显而易见的解决方案-Hadoop。 具体来说,它是从Cloudera组装而成的。 Hadoop允许您使用非结构化数据,并且可以通过添加数据节点轻松地进行扩展。 此外,该产品已经过充分研究,因此可以在Stackoverflow上找到任何问题的答案,而不必在研发上花费资源。
实施Hadoop之后,我们得到了第一个统一存储的下图:

数据是从少量来源收集到Hadoop中的,然后使用了几个接口:用于Ad-Hoc分析的BI工具和服务。
进一步的事件出乎意料地发展:我们的Hadoop完美启动,数据流入商店的消费者放弃了旧的分析系统,开始每天使用新产品进行工作。
但是出现了一个问题:您做得越多,他们就越希望您。 很快,已经集成到Hadoop中的项目开始要求更多数据。 那些尚未添加的项目开始要求它。 对稳定性的要求开始急剧增长。
同时,简单地线性增加团队是不合理的。 如果两个DWH开发人员处理两个项目,那么对于四个项目,我们不能再雇用两个开发人员。 因此,首先我们走另一条路。
流程建立
在资源有限的情况下,最便宜的解决方案是调整流程。 此外,在大型公司中,不可能仅提出并实施存储架构。 必须与大量的人进行谈判。
- 首先,由业务代表为分析分配资源。 您将必须证明,您只需要执行来自客户的有益于业务的任务。
- 您还需要与分析师协商,以便他们为您提供一些回报,以为他们提供的服务-系统分析,业务分析,测试。 例如,我们将数据源的系统分析提供给了分析师。 当然,他们并不快乐,否则,将根本没有人去做。
- 最后但并非最不重要的一点是,您必须与游戏开发人员进行谈判:安装SLA并就数据结构达成协议。 如果这些字段不断消失,出现和重命名,那么无论团队规模大小,您都将永远无法动弹。
- 您还需要与自己的团队进行谈判:在所有开发人员都想创建的理想解决方案与不太有趣但可以廉价而快速地铆接的标准解决方案之间寻求折衷。
- 必须与管理员就监视基础结构达成一致。 虽然,只要您拥有更多资源,最好还是在存储团队中聘请自己的DevOps专家。
至此,如果存储库的这种变体可以满足为此设置的所有目标,那么我可以结束本文。 但是事实并非如此。 怎么了
在使用Hadoop之前,我们可以提供五个项目的数据和统计信息。 随着Hadoop的实施,并且没有增加团队,我们就能够涵盖10个项目。 建立流程后,我们的团队已经为15个项目提供了服务。 这很酷,但是我们有150个项目,我们需要一些新的东西。
气流实施
最初,我们使用Cron从来源收集数据。 有两个项目很正常。 10-很痛,但是还可以。 但是,现在每天要加载约12,000个流程,以将150个项目加载到DataLake中。 Cron不再适合。 为此,我们需要一个功能强大的工具来管理数据下载流。
我们选择了开源的Airflow Task Manager。 他出生于Airbnb的大肠,之后被转移到Apache。 这是用于代码驱动的ETL的工具。 也就是说,您使用Python编写了一个脚本,然后将其转换为DAG(有向无环图)。 DAG非常适合维护任务之间的依赖关系-您不能使用尚未加载的数据来构建店面。
气流具有出色的错误处理程序。 如果进程崩溃或网络出现问题,调度程序将以指定的次数重新启动该进程。 例如,如果有很多失败,则源中的表已更改,那么将收到一条通知消息。
Airflow具有出色的UI:它可以方便地显示哪些进程正在运行,哪些进程成功完成或出现错误。 如果任务因错误而失败,则可以从界面重新启动它们,并通过监视来控制过程,而无需进入代码。
Airflow是可定制的,它建立在操作员之上-这些是用于处理特定来源的插件。 开箱即用的一些操作员,许多人已经编写了Airflow社区。 如果愿意,您可以创建自己的运算符,其接口非常简单。
我们如何利用气流?
例如,我们需要将一个表从PostgreSQL加载到Hadoop中。
sql_sensor_battle_log
任务检查以查看源中是否有我们昨天需要的数据。 如果是这样,
load_stg_data_from_battle_log
任务将从PG中
load_stg_data_from_battle_log
数据并将其添加到Hadoop。 最后,
load_oda_data_from_battle_log
执行初始处理:例如,从Unix时间转换为人类可读的时间。
在这样的任务链中,数据来自一个来源的一个实体:

因此-从一个来源获得我们需要的所有实体:

这套下载是DAG。 目前,我们有250个此类DAG,用于在其上加载原始数据,处理,转换和创建店面。
更新后的统一存储方案如下:

- 引入Airflow之后,我们能够大幅增加源数量-多达400件。 数据源既是内部的(来自我们的游戏)又是外部的:购买的统计系统,异构API。 通过Airflow,我们可以每天执行和控制12,000个流程,处理来自我们所有150款游戏的数据。
- 关于我们的气流的更多细节,Dean Safina在她的文章( https://habr.com/ru/company/mailru/blog/344398/ )中写道。 并加入Telegram上的Airflow社区( https://t.me/ruairflow )。 关于气流的许多问题都可以借助文档解决,但有时还会出现更多自定义请求:如何将Airflow打包到docker中,为什么第三天不起作用等等。 这可以在这个社区得到解答。
DataLake有哪些改进
在这一点上,DWH开发人员有信心一切准备就绪,现在您可以冷静下来。 不幸的是,幸运的是,DataLake中仍有一些需要加强的地方。
资料品质
由于DataLake中有大量表,因此数据质量是首当其冲的。 例如,拿一张有付款的表。 它包含user_id,金额,付款日期和时间:
每天大约有1万笔付款:

当天出现在表格中的只有28个。 是的,并且user_id全部为空:
如果我们的货源突然中断,那么借助Airflow,我们将立即知道它。 但是,如果正式有数据,甚至采用正确的格式,那么我们也不会立即从数据使用者那里了解细分情况。 亲手检查我们的5000张桌子是不现实的。
为了防止这种情况,我们开发了自己的数据质量控制(DQ)系统。 它每天都在监视到我们资源库的关键下载:它跟踪行数的突然变化,查找空字段,并检查数据是否重复。 该系统还应用来自分析师的自定义检查。 基于此,她向邮件发送有关出了什么问题以及出了什么地方的通知。 分析人员进入项目,找出原因,例如,数据太少,消除了原因,然后我们重新加载数据。
优先下载
随着将数据加载到DataLake的任务数量的增加,优先级冲突迅速出现。 通常的情况是:某个不太重要的项目在晚上下载了所有资源,而计算高层管理人员指标所需的表在工作之初就没有时间加载。 我们以几种方式处理这个问题。
- 监视密钥下载。 Airflow拥有自己的SLA系统,该系统可让您确定所有密钥是否均按时交付。 如果未加载某些数据,那么我们将比用户早几个小时找到有关信息,并有时间进行修复。
- 优先级设置。 为此,我们使用气流队列和优先级系统。 它使我们能够确定DAG的加载顺序以及其中的并行进程数。 在下载用于高级管理指标的数据之前,每季度上载一次分析的日志是没有意义的。
监视夜间批处理的持续时间
我们有一个批量存储。 在晚上,我们正在构建它,并且对我们来说重要的是要确保有足够的夜晚来处理每日批生产。 否则,在工作时间内,分析师将没有足够的存储资源来工作。 我们定期以几种方式解决此问题:
- 反向缩放。 我们不会提供所有数据,而只会提供分析人员需要的数据。 我们监视所有已加载的表,如果其中一个表六个月都未使用,则将其关闭。
- 能力建设。 如果我们了解我们受到网络功能,内核数或磁盘容量的限制,那么我们将数据节点添加到Hadoop。
- 优化工人气流。 我们正在做所有事情,以便在存储构建时间的每时每刻都最大限度地利用系统的每个部分。
- 重构非最佳过程。 例如,我们考虑新鲜游戏的经济性,这需要5分钟。 但是一年之后,数据不断增长,相同的请求将处理2个小时。 在某些时候,我们必须重新调整增量重新计算,尽管在开始时这似乎是不必要的复杂性。
资源控制
重要的是,不仅要有时间在工作日开始之前准备好存储库,而且还要在此之后监视其资源的可用性。 这样,随着时间的流逝可能会出现困难。 首先,原因是分析人员编写了次优查询。 同样,分析师本身也越来越多。 在这种情况下最简单的事情是:增加硬件容量。 但是,非最佳请求仍将占用所有可用资源。 也就是说,迟早您将开始花钱购买铁而没有明显的好处。 因此,我们使用其他几种方法。
- 报价:我们至少留给用户一些资源。 是的,请求将缓慢执行,但至少会执行。
- 监视消耗的资源:用户请求使用了多少个内核,忘记使用Hadoop中的分区并占用了所有RAM等。...此外,这些监视对于分析人员本身是可见的,并且当不适用于它们时,他们自己会找到罪魁祸首并进行处理他。 如果我们的项目很少,我们将自己跟踪资源的消耗。 但是如果有这么多,我们将不得不雇用一个单独的,不断扩大的监控团队。 从长远来看,这是不合理的。
- 自愿性的用户培训。 分析师的工作不是将质量查询写入您的存储库。 他们的工作是回答业务问题。 除了我们自己-存储库团队-没有人关心分析师的请求质量。 因此,我们创建了FAQ和演示文稿,为分析人员举办了讲座,解释了如何使用DataLake,以及如何不使用。
实际上,花时间使数据可用比填充数据重要得多。 如果存储中有数据,但是数据不可用,那么从业务角度来看,它仍然存在,并且您已经花了很多精力进行下载。
架构灵活性
重要的是不要忘记内置DataLake的灵活性,并且在更改输入因素时需要害怕改变架构:哪些数据需要上传到存储,谁使用以及如何使用。 我们不相信我们的架构将永远保持不变。
例如,我们推出了一款新的手机游戏。 她从客户端将JSON写入Nginx,Nginx将数据抛出到Kafka,我们使用Spark解析并将其放入Hadoop。 一切正常,任务已关闭。

几个月过去了,夜间存储的所有过程都开始运行更长的时间。 我们开始弄清问题所在:事实证明,游戏“射击”生成了50倍的数据,Spark无法处理JSON分析,从而浪费了一半的存储资源。 最初,所有数据都发送到一个Kafka主题,然后Spark将它们分类为不同的实体。 我们要求游戏开发者与不同实体共享客户的数据,并将它们倒入单独的Kafka主题中。 它变得更容易,但并没有持续很长时间。 然后,我们决定从每日JSON分析改为每小时一次。 但是,不仅开始在夜间而且要昼夜不停地建立存储设施,这对我们来说是不可取的。 经过这样的尝试,为了解决这个问题,我们放弃了Spark并实施了ClickHouse。

它具有出色的JSON解析引擎,可立即将数据分解为表。 我们首先将信息从Kafka发送到ClickHouse,然后从那里在Hadoop中获取信息。 这完全解决了我们的问题。
当然,我们尽量不要在我们的DataLake存储中繁殖动物园系统,而是尝试为特定任务选择最合适的技术。
值得吗?
部署质量控制系统Hadoop,处理Airflow并建立业务流程是否值得? 当然值得:
- 该业务具有有关所有项目的最新信息,这些信息可以在单个服务中获得。
- 从游戏设计人员到管理人员,我们系统的用户都仅根据直觉停止了决策,而是转而使用“数据驱动”方法。
- 我们为分析师提供了制作自己的火箭科学的工具。 现在,他们可以回答复杂的业务查询,建立预测模型,推荐系统,改进游戏。 实际上,为此,我们在BI-DWH中工作。