吕永卫微博广告资深数据开发笁程师,实时数据项目组负责人
黄鹏,微博广告实时数据开发工程师负责法拉第实验平台数据开发、实时数据关联平台、实时算法特征数据计算、实时数据仓库、实时数据清洗组件开发工作。
林发明微博广告资深数据开发工程师,负责算法实时特征数据计算、实时数據关联平台、实时数据仓库、FlinkStream组件开发工作
崔泽峰,微博广告资深数据开发工程师负责实时算法特征数据计算、实时任务管理平台、FlinkStream組件、FlinkSQL扩展开发工作。
是随着微博业务线的快速扩张微博广告各类业务日志的数量也随之急剧增长。传统基于Hadoop生态的离线数据存储计算方案已在业界形成统一的默契但受制于离线计算的时效性制约,越来越多的数据应用场景已从离线转为实时微博广告实时数据平台以此为背景进行设计与构建,目前该系统已支持日均处理日志数量超过百亿接入产品线、业务日志类型若干。
相比于Spark目前Spark的生态总体更為完善一些,且在机器学习的集成和应用性暂时领先但作为下一代大数据引擎的有力竞争者-Flink在流式计算上有明显优势,Flink在流式计算里属於真正意义上的单条处理每一条数据都触发计算,而不是像Spark一样的Mini
Batch作为流式处理的妥协Flink的容错机制较为轻量,对吞吐量影响较小而苴拥有图和调度上的一些优化,使得Flink可以达到很高的吞吐量而Strom的容错机制需要对每条数据进行ack,因此其吞吐量瓶颈也是备受诟病
这里引用一张图来对常用的实时计算框架做个对比。
Flink是一个开源的分布式实时计算框架Flink是有状态的和容错的,可以在维护一次应用程序状态嘚同时无缝地从故障中恢复;它支持大规模计算能力能够在数千个节点上并发运行;它具有很好的吞吐量和延迟特性。同时Flink提供了多種灵活的窗口函数。
如果你对大数据开发感兴趣想系统学习大数据的话,可以加入大数据技术学习交流扣扣裙:数字522数字189数字307欢迎添加,私信管理员了解课程介绍,获取学习资源
Flink检查点机制能保持exactly-once语义的计算。状态保持意味着应用能够保存已经处理的数据集结果和狀态
Flink支持流处理和窗口事件时间语义。事件时间可以很容易地通过事件到达的顺序和事件可能的到达延迟流中计算出准确的结果
Flink支持基于时间、数目以及会话的非常灵活的窗口机制(window)。可以定制window的触发条件来支持更加复杂的流模式
Flink高效的容错机制允许系统在高吞吐量的情况下支持exactly-once语义的计算。Flink可以准确、快速地做到从故障中以零数据丢失的效果进行恢复
Flink具有高吞吐量和低延迟(能快速处理大量数據)特性。下图展示了Apache Flink和Apache Storm完成分布式项目计数任务的性能对比
初期架构仅为计算与存储两层,新来的计算需求接入后需要新开发一个实時计算任务进行上线重复模块的代码复用率低,重复率高计算任务间的区别主要是集中在任务的计算指标口径上。
在存储层各个需求方所需求的存储路径都不相同,计算指标可能在不通的存储引擎上有重复有计算资源以及存储资源上的浪费情况。并且对于指标的计算口径也是仅局限于单个任务需求里的不通需求任务对于相同的指标的计算口径没有进行统一的限制于保障。各个业务方也是在不同的存储引擎上开发数据获取服务对于那些专注于数据应用本身的团队来说,无疑当前模式存在一些弊端
随着数据体量的增加以及业务线嘚扩展,前期架构模式的弊端逐步开始显现从当初单需求单任务的模式逐步转变为通用的数据架构模式。为此我们开发了一些基于Flink框架的通用组件来支持数据的快速接入,并保证代码模式的统一性和维护性在数据层,我们基于Clickhouse来作为我们数据仓库的计算和存储引擎利用其支持多维OLAP计算的特性,来处理在多维多指标大数据量下的快速查询需求在数据分层上,我们参考与借鉴离线数仓的经验与方法構建多层实时数仓服务,并开发多种微服务来为数仓的数据聚合指标提取,数据出口数据质量,报警监控等提供支持
1)接入层:接叺原始数据进行处理,如Kafka、RabbitMQ、File等
2)计算层:选用Flink作为实时计算框架,对实时数据进行清洗关联等操作。
3)存储层: 对清洗完成的数据進行数据存储我们对此进行了实时数仓的模型分层与构建,将不同应用场景的数据分别存储在如Clickhouse,Hbase,Redis,Mysql等存储服务中,并抽象公共数据层与維度层数据分层处理压缩数据并统一数据口径。
4)服务层:对外提供统一的数据查询服务支持从底层明细数据到聚合层数据5min/10min/1hour的多维计算服务。同时最上层特征指标类数据如计算层输入到Redis、Mysql等也从此数据接口进行获取。
5)应用层:以统一查询服务为支撑对各个业务线数據场景进行支撑
监控报警:对Flink任务的存活状态进行监控,对异常的任务进行邮件报警并根据设定的参数对任务进行自动拉起与恢复根據如Kafka消费的offset指标对消费处理延迟的实时任务进行报警提醒。
数据质量:监控实时数据指标对历史的实时数据与离线hive计算的数据定时做对仳,提供实时数据的数据质量指标对超过阈值的指标数据进行报警。
整体数据从原始数据接入后经过ETL处理, 进入实时数仓底层数据表经過配置化聚合微服务组件向上进行分层数据的聚合。根据不同业务的指标需求也可通过特征抽取微服务直接配置化从数仓中抽取到如Redis、ES、MysqlΦ进行获取大部分的数据需求可通过统一数据服务接口进行获取。
原始日志数据因为各业务日志的不同所拥有的维度或指标数据并不唍整。所以需要进行实时的日志的关联才能获取不同维度条件下的指标数据查询结果并且关联日志的回传周期不同,有在10min之内完成95%以上囙传的业务日志也有类似于激活日志等依赖第三方回传的有任务日志,延迟窗口可能大于1天并且最大日志关联任务的日均数据量在10亿級别以上,如何快速处理与构建实时关联任务的问题首先摆在我们面前对此我们基于Flink框架开发了配置化关联组件。对于不同关联日志的指标抽取我们也开发了配置化指标抽取组件用于快速提取复杂的日志格式。以上两个自研组件会在后面的内容里再做详细介绍
1)回传周期超过关联窗口的日志如何处理?
对于回传晚的日志,我们在关联窗口内未取得关联结果我们采用实时+离线的方式进行数据回刷补全。實时处理的日志我们会将未关联的原始日志输出到另外一个暂存地(Kafka)同时不断消费处理这个未关联的日志集合,设定超时重关联次数与超時重关联时间超过所设定任意阈值后,便再进行重关联离线部分,我们采用Hive计算昨日全天日志与N天内的全量被关联日志表进行关联將最终的结果回写进去,替换实时所计算的昨日关联数据
2)如何提高Flink任务性能?
为了更高效地分布式执行Flink会尽可能地将operator的subtask链接(chain)在┅起形成task。每个task在一个线程中执行将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化减少数据在缓沖区的交换,减少了延迟的同时提高整体的吞吐量
Flink会在生成JobGraph阶段,将代码中可以优化的算子优化成一个算子链(Operator Chains)以放到一个task(一个线程)中执行以减少线程之间的切换和缓冲的开销,提高整体的吞吐量和延迟下面以官网中的例子进行说明。
上下游算子的并行度一致;
两个节点间数据分区方式是 forward;
用户没有禁用 chain
流式计算中,常常需要与外部系统进行交互而往往一次连接中你那个获取连接等待通信嘚耗时会占比较高。下图是两种方式对比示例:
图中棕色的长条表示等待时间可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解決同步访问的问题异步模式可以并发地处理多个请求和回复。也就是说你可以连续地向数据库发送用户a、b、c等的请求,与此同时哪個请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待如上图右边所示。这也正是 Async I/O 的实现原理
Flink实现了一套强大嘚checkpoint机制,使它在获取高吞吐量性能的同时也能保证Exactly Once级别的快速恢复。
首先提升各节点checkpoint的性能考虑的就是存储引擎的执行效率Flink官方支持嘚三种checkpoint state存储方案中,Memory仅用于调试级别无法做故障后的数据恢复。其次还有Hdfs与Rocksdb当所做Checkpoint的数据大小较大时,可以考虑采用Rocksdb来作为checkpoint的存储以提升效率
其次的思路是资源设置,我们都知道checkpoint机制是在每个task上都会进行那么当总的状态数据大小不变的情况下,如何分配减少单个task所汾的的checkpoint数据变成了提升checkpoint执行效率的关键
最后,增量快照. 非增量快照下每次checkpoint都包含了作业所有状态数据。而大部分场景下前后checkpoint里,数據发生变更的部分相对很少所以设置增量checkpoint,仅会对上次checkpoint和本次checkpoint之间状态的差异进行存储计算减少了checkpoint的耗时。
3)如何保障任务的稳定性?
茬任务执行过程中会遇到各种各样的问题,导致任务异常甚至失败所以如何做好异常情况下的恢复工作显得异常重要。
Flink支持不同的重啟策略以在故障发生时控制作业如何重启。集群在启动时会伴随一个默认的重启策略在没有定义具体重启策略时会使用该默认策略。洳果在工作提交时指定了一个重启策略该策略会覆盖集群的默认策略。
Flink在任务启动时指定HA配置主要是为了利用Zookeeper在所有运行的JobManager实例之间进荇分布式协调.Zookeeper通过leader选取和轻量级一致性的状态存储来提供高可用的分布式协调服务
在实际环境中,我们遇见过因为集群状态不稳定而导致的任务失败在Flink1.6版本中,甚至遇见过任务出现假死的情况也就是Yarn上的job资源依然存在,而Flink任务实际已经死亡为了监测与恢复这些异常嘚任务,并且对实时任务做统一的提交、报警监控、任务恢复等管理我们开发了任务提交与管理平台。通过Shell拉取Yarn上Running状态与Flink
Job状态的列表进荇对比心跳监测平台上的所有任务,并进行告警处理建议、自动恢复等操作
Flink任务在运行过程中,各Operator都会产生各自的指标数据例如,Source會产出numRecordIn、numRecordsOut等各项指标信息我们会将这些指标信息进行收集,并展示在我们的可视化平台上指标平台如下图:
1、如何选择关联方式?
刚開始我们直接使用Flink Table做为数据关联的方式直接将接入进来的DataStream注册为Dynamic Table后进行两表关联查询,如下图:
但尝试后发现在做那些日志数据量大的關联查询时往往只能在较小的时间窗口内做查询否则会超过datanode节点单台内存限制,产生异常但为了满足不同业务日志延迟到达的情况,這种实现方式并不通用
之后,我们直接在DataStream上进行处理在CountWindow窗口内进行关联操作,将被关联的数据Hash打散后存储在各个datanode节点的Rocksdb中利用Flink State原生支持Rocksdb做Checkpoint这一特性进行算子内数据的备份与恢复。这种方式是可行的但受制于Rocksdb集群物理磁盘为非SSD的因素,这种方式在我们的实际线上场景Φ关联耗时较高
如Redis类的KV存储的确在查询速度上提升不少,但类似广告日志数据这样单条日志大小较大的情况会占用不少宝贵的机器内存资源。经过调研后我们选取了Hbase作为我们日志关联组件的关联数据存储方案。
为了快速构建关联任务我们开发了基于Flink的配置化组件平囼,提交配置文件即可生成数据关联任务并自动提交到集群下图是任务执行的处理流程。
下图是关联组件内的执行流程图:
随着日志量嘚增加某些需要进行关联的日志数量可能达到日均十几亿甚至几十亿的量级。前期关联组件的配置化生成任务的方式的确解决了大部分線上业务需求但随着进一步的关联需求增加,Hbase面临着巨大的查询压力在我们对Hbase表包括rowkey等一系列完成优化之后,我们开始了对关联组件嘚迭代与优化
第一步,减少Hbase的查询我们使用Flink Interval Join的方式,先将大部分关联需求在程序内部完成只有少部分仍需查询的日志会去查询外部存储(Hbase). 经验证,以请求日志与实验日志关联为例对于设置Interval Join窗口在10s左右即可减少80%的hbase查询请求
过期数据 - 出于性能和存储的考虑,要将过期数据清除如图当WaterMark是2的时候时间为2以前的数据过期了,可以被清除
取消右侧数据流的join标志位;
左侧数据流有join数据时不存state。
在任务执行中往往会出现意想不到的情况,比如被关联的数据日志出现缺失或者日志格式错误引发的异常,造成关联任务的关联率下降严重那么此时關联任务虽然继续在运行,但对于整体数据质量的意义不大甚至是反向作用。在任务进行恢复的时还需要清除异常区间内的数据,将Kafka Offset設置到异常前的位置再进行处理
故我们在关联组件的优化中,加入了动态监控下面示意图:
关联任务中定时探测指定时间范围 Hbase是否有朂新数据写入,如果没有说明写Hbase任务出现问题,则终止关联任务;
当写Hbase任务出现堆积时相应的会导致关联率下降,当关联率低于指定閾值时终止关联任务;
当关联任务终止时会发出告警处理建议修复上游任务后可重新恢复关联任务,保证关联数据不丢失
为了快速进荇日志数据的指标抽取,我们开发了基于Flink计算平台的指标抽取组件Logwash封装了基于Freemaker的模板引擎做为日志格式的解析模块,对日志进行提取算术运算,条件判断替换,循环遍历等操作
下图是Logwash组件的处理流程:
组件支持文本与Json两种类型日志进行解析提取,目前该清洗组件已支持微博广告近百个实时清洗需求提供给运维组等第三方非实时计算方向人员快速进行提取日志的能力。
Flink中DataStream的开发对于通用的逻辑及楿同的代码进行了抽取,生成了我们的通用组件库FlinkStreamFlinkStream包括了对Topology的抽象及默认实现、对Stream的抽象及默认实现、对Source的抽象和某些实现、对Operator的抽象忣某些实现、Sink的抽象及某些实现。任务提交统一使用可执行Jar和配置文件Jar会读取配置文件构建对应的拓扑图。
对于Source进行抽象创建抽象类忣对应接口,对于Flink Connector中已有的实现例如kafka,Elasticsearch等,直接创建新class并继承接口实现对应的方法即可。对于需要自己去实现的connector直接继承抽象类及对應接口,实现方法即可目前只实现了KafkaSource。
与Source抽象类似我们实现了基于Stream到Stream级别的Operator抽象。创建抽象Operate类抽象Transform方法。对于要实现的Transform操作直接繼承抽象类,实现其抽象方法即可目前实现的Operator,直接按照文档使用如下:
对于单Stream,要处理的逻辑可能比较简单主要读取一个Source进行数據的各种操作并输出。对于复杂的多Stream业务需求比如多流Join,多流Union、Split流等因此我们多流业务进行了抽象,产生了Topology在Topology这一层可以对多流进荇配置化操作。对于通用的操作我们实现了默认Topology,直接通过配置文件就可以实现业务需求对于比较复杂的业务场景,用户可以自己实現Topology
我们对抽象的组件都是可配置化的,直接通过编写配置文件构造任务的运行拓扑结构,启动任务时指定配置文件
在实时任务管理岼台,新建任务填写任务名称,选择任务类型(Flink)及版本上传可执行Jar文件,导入配置或者手动编写配置填写JobManager及TaskManager内存配置,填写并行喥配置选择是否重试,选择是否从checkpoint恢复等选项保存后即可在任务列表中启动任务,并观察启动日志用于排查启动错误
SQL语言是一门声奣式的,简单的灵活的语言,Flink本身提供了对SQL的支持Flink1.6版本和1.8版本对SQL语言的支持有限,不支持建表语句不支持对外部数据的关联操作。洇此我们通过Apache Calcite对Flink SQL API进行了扩展用户只需要关心业务需求怎么用SQL语言来表达即可。
扩展了支持创建源表SQL通过解析SQL语句,获取数据源配置信息创建对应的TableSource实例,并将其注册到Flink environment示例如下:
使用sqlQuery方法,支持从上一层表或者视图中创建视图表并将新的视图表注册到Flink Environment。创建语句需要按照顺序写比如myView2是从视图myView1中创建的,则myView1创建语句要在myView2语句前面如下:
支持自定义UDF函数,继承ScalarFunction或者TableFunction在resources目录下有相应的UDF资源配置文件,默认会注册全部可执行Jar包中配置的UDF直接按照使用方法使用即可。
八、实时数据仓库的构建
为了保证实时数据的统一对外出口以及保證数据指标的统一口径我们根据业界离线数仓的经验来设计与构架微博广告实时数仓。
数据引入层(ODSOperation Data Store):将原始数据几乎无处理的存放在数据仓库系统,结构上与源系统基本保持一致是数据仓库的数据准。
数据公共层(CDMCommon Data Model,又称通用数据模型层):包含DIM维度表、DWD和DWS甴ODS层数据加工而成。主要完成数据加工与整合建立一致性的维度,构建可复用的面向分析和统计的明细事实表以及汇总公共粒度的指標。
公共维度层(DIM):基于维度建模理念思想建立整个企业的一致性维度。降低数据计算口径和算法不统一风险
公共维度层的表通常吔被称为逻辑维度表,维度和维度逻辑表通常一一对应
公共汇总粒度事实层(DWS,Data Warehouse Service):以分析的主题对象作为建模驱动基于上层的应用囷产品的指标需求,构建公共粒度的汇总指标事实表以宽表化手段物理化模型。构建命名规范、口径一致的统计指标为上层提供公共指标,建立汇总宽表、明细事实表
公共汇总粒度事实层的表通常也被称为汇总逻辑表,用于存放派生指标数据
明细粒度事实层(DWD,Data Warehouse Detail):以业务过程作为建模驱动基于每个具体的业务过程特点,构建最细粒度的明细层事实表可以结合企业的数据使用特点,将明细事实表的某些重要维度属性字段做适当冗余也即宽表化处理。
明细粒度事实层的表通常也被称为逻辑事实表
数据应用层(ADS,Application Data Service):存放数据產品个性化的统计指标数据根据CDM与ODS层加工生成。
对于原始日志数据ODS层几乎是每条日志抽取字段后进行保留,这样便能对问题的回溯与縋踪在CDM层对ODS的数据仅做时间粒度上的数据压缩,也就是在指定时间切分窗口里对所有维度下的指标做聚合操作,而不涉及业务性的操莋在ADS层,我们会有配置化抽取微服务对底层数据做定制化计算和提取,输出到用户指定的存储服务里