如何对flink处理告警处理建议做性能测试

麻烦路过的各位亲给这个项目点個 star太不易了,写了这么多算是对我坚持下来的一种鼓励吧!

基于 Flink 1.9 讲解的专栏,涉及入门、概念、原理、实战、性能调优、系统案例的講解扫码下面专栏二维码可以订阅该专栏

将该项目的 Flink 版本升级到 1.9.0,有一些变动Flink 1.8.0 版本的代码经群里讨论保存在分支

  • 这本书比较薄,处于介绍阶段国内有这本的翻译书籍

  • 这本书比较基础,初学的话可以多看看

  • 这本书评价不是一般的高

新增流处理引擎相关的 Paper在 paper 目录下:

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了 你可以加我的微信:yuanblog_tzs,然后回复关键字:Flink 即可无条件获取到转载请联系本人获取授权,违者必究

更多私密资料请加入知识星球!

有人要问知识星球里面更新什么内容?值得加入吗

目前知识星球内已更新嘚系列文章:

除了《从1到100深入学习Flink》源码学习这个系列文章,《从0到1学习Flink》的案例文章也会优先在知识星球更新让大家先通过一些 demo 学习 Flink,再去深入源码学习!

如果学习 Flink 的过程中遇到什么问题,可以在里面提问我会优先解答,这里做个抱歉自己平时工作也挺忙,微信嘚问题不能做全部做一些解答 但肯定会优先回复给知识星球的付费用户的,庆幸的是现在星球里的活跃氛围还是可以的有不少问题通過提问和解答的方式沉淀了下来。

等等等还有很多,复制粘贴的我手累啊 ?

另外里面还会及时分享 Flink 的一些最新的资料(包括数据、视頻、PPT、优秀博客持续更新,保证全网最全因为我知道 Flink 目前的资料还不多)

再就是星球用户给我提的一点要求:不定期分享一些自己遇箌的 Flink 项目的实战,生产项目遇到的问题是如何解决的等经验之谈!

当然,除了更新 Flink 相关的东西外我还会更新一些大数据相关的东西,洇为我个人之前不是大数据开发所以现在也要狂补些知识!总之,希望进来的童鞋们一起共同进步!

本文汇总了实时计算Flink版服务和作業上线、启动、输出、延时等的常见问题

    在购买实时计算Flink版服务时,遇到

    • 删除AliyunStreamDefaultRole角色后导致实时计算Flink版正在运行的作业瞬间无法读写上丅游外部存储,直到后续重新初始化AliyunStreamDefaultRole角色后才可恢复正常请务必确认这项操作对于线上实时计算Flink版业务是否会造成影响。
    • RAM删除角色和实時计算Flink版重新初始化操作均需要您的主账号或者经主账号授权的RAM账号来完成。

控制台提示项目参数错误项目不存在,应该如何处理

    茬阿里云实时计算Flink版控制台出现错误提示窗口。

    系统下线了已经到期但是没有续费的项目

    您需要对项目进行续费,详情请参见

作业无法启动,应该如何处理

您在完成作业开发上线以后,在运维页面无法启动作业?原因有以下几点:

      查看Failover报错信息,分析Job运行异常原因通常Failover有以下报错。

      这个问题通常是由资源不足引起的请先检查资源是否充足,不足请先扩容

      在运行信息中,查看是否有如下的报错提示

       

      在已使用CU接近已购买CU的时,如果您的作业所需CU较大会导致作业无法启动,需要您重新

      在运行信息中,查看是否有如下的报错提礻

       

      重新获取资源配置。BlinkSQL任务开发完成后需要单击资源配置,通过获取自动生成JSON配置来生成一份默认的配置文件

      作业运维曲线上没有任何指标,作业一直处于Created状态也没有Failover报错。

      先检查资源是否足够如果资源足够,请您手动指增加CU后重启作业。

作业上线后作业的運维页面看不到拓扑图、瞬时值的指标或曲线指标,应该如何处理

  1. 查看是否有Failover错误信息

    解决方案:如果有Failover错误信息,请参见

  2. 运维页面沒有拓扑图和瞬时值的数据

    进行咨询。工单格式如下:

  • 运维页面的曲线指标全部指标或者是部分指标都没有信息
    1. 首先确认是否使用了自萣义的Source,如果是自定义的SourceSource的输入曲线为空是正常的,因为实时计算Flink版无法监控到自定义的Source
    2. 如果没有使用自定义的插件,但曲线指标不存在请您进行咨询,工单格式如下:

新作业提交后项目中现存作业出现异常的原因是什么?

独享模式采用的是CGroup(Control Group)非严格资源隔离模式在集群资存在空闲时,单个作业(作业A)资源可以超用(即实际使用资源可超过申请资源运维界面对应显示资源健康分小于等于20分)。如果新提交作业(作业B)申请的资源大于实际剩余资源将要求超用资源的作业A退还部分资源。现存作业(作业A)的资源减少后可能會出现业务延迟增大等异常

  • 仅独享模式存在此问题。
  • 新作业提交运行后项目使用率过高(超过95%以上)可能会出现以上问题。

例如1个項目的总资源为10CU,已经启动了两个作业分别申请了2.5CU。由于资源有超用其中一个作业A实际使用了3CU。总项目上来看已使用CU(即实际申请CU)=5CU,但实际使用的资源为5.5CU如果此时再提交运行1个申请5CU的作业,由于总资源为10CU此时资源超用的作业A需要退还超用的0.5CU。因此作业A实际使鼡的资源为2.5CU(与实际申请的一致),相比之前的实际使用的资源3CU有所减少其处理能力也有所下降,可能会出现业务延迟增大的现象

    上線运行作业后,下游结果表中没有数据

    1. 检查作业中是否存在Failover

        查看Failover报错信息,分析作业运行异常原因

    2. 解决Failover问题,使作业正常运行

  • 检查源表数据是否进入实时计算Flink版

      这种情况下没有Failover,但数据延时会很大请查看数据曲线,检查各Source输入是否有数据

  • 检查源表,保证上游有数據进入实时计算Flink版

  • 检查数据是否被某个节点过滤
      • 运行信息页面,查看每个节点的IN_Q(输入)OUT_Q(输出)如果输入有数据,输出为0说奣数据被这个节点过滤了。单击对应节点顶部深蓝色区域后再单击Metrics,查看数据输入数据输出指标的详细信息
      • 数据曲线页面,查看數据流入流出情况
        1. 有数据流入:检查是否使用了数据存储的方式引入。
        2. 没有数据流出:可能是启动位点设置的有误
    • Window过滤了数据(检查Watermark嘚字段是否有效)。
    • Where条件过滤了数据
  • 检查下游是否由于默认缓存机制缓存了数据

    解决方案:排除JOB的业务逻辑异常后,调整下游存储的

    说奣 该参数可能会造成下游数据库I/O压力过大、存在性能瓶颈的风险

  • 检查下游RDS,是否存在死锁

另外您可以使用调试模式(Debug),将计算结果咑印到日志中对日志进行分析,判断无输出结果的原因详情请参见

为什么使用滑动窗口没有数据输出?

    • 检查源表Watermark的时间字段是不是TIMESTAMP类型如果不是,请您使用进行转换
      1. 请参见的时间类型中关于Watermark的说明了解更多。
      2. 请参见数据列章节了解如何进行转换
    • 检查数据源表的并發是不是都有数据,只要有一个并发没有数据都不会触发Watermark形成窗口输出如下图所示 。

作业延时增加应该如何处理?

      即数据源中的数據到达实时计算Flink版的时间间隔。

      数据间隔延时过大表明上游可能没有数据进入实时计算Flink版系统实时计算Flink版数据间隔时间等于所有并发中嘚最大的延迟。您可以参见运维页面

      曲线分析源表各并发的延迟

      SQL语法或者资源配置的不合理可能导致作业性能不佳,从而造成数据处理嘚滞后和作业延时的增加

    • 检查作业链路,确保上游所有Shard(Queue)都有最新的数据流入实时计算Flink版
    • 如果在数据间隔时间较小的情况下,仍然存在作业延时增加的现象则可能是作业性能不佳所导致。处理方法参见SQL优化后,建议您先进行3~5次如果调优效果不明显,再进行
      • 第┅次进行自动配置调优时,请不要指定CU数
      • 建议您先优化SQL,再进行资源配置优化效率会更高。

作业延迟过大应该如何处理?

线上作业延迟过大表现为曲线图中

呈水平趋势,或逐步呈上升趋势或在追历史数据(启动位点调整到当前时间之前)的过程中,曲线图中

下降非常缓慢常见的原因有以下几种:

      • 在曲线图中,查看各Source的脏数据脏数据会在相应时间节点上显示数据量,没有则显示为0
      • 查看FailoverTaskmanager.log日志Φ的报错信息,脏数据会在错误信息中标明

      更改SQL,过滤脏数据后重新上线作业。

  • 没有源表数据进入实时计算Flink版

      这种情况下没有Failover请查看数据曲线,检查各Source输入是否有数据

  • 检查源表,确保上游有数据进入实时计算Flink版详情请参见数据曲线业务延迟3条曲线。

    在RDS写入过慢时建议您调大并发或增加资源,观察输出是否仍然为0或RPS极小

    新建一个Schema相同的结果表,重新注册存储引入

    请确认是否存在以下情况:

    • 输叺数据不稳定,表现为输入曲线有抖动
    • 历史数据处理完成后(例如,追历史数据完成后)延迟会下降。
    • 确认数据生产机制是否异常
    • 數据输入波动大,检查Source源
    1. 优化SQL。详情请参见
    2. 优化上下游WITH参数。详情请?参见上下游存储文档的WITH参数介绍
    3. 优化作业参数。详情请参见
    4. 资源调优。后再进行。

业务延迟显示作业无效应该如何处理?

业务延迟显示无效的原因包括如下几种您可以根据不同的原因进行對应的处理:

  • 作业运行失败,无法获取稳定的埋点信息

    根据Failover报错信息提示修复问题,使作业正常运行

  • 您没有对自定义源表或Datastream作业进行埋点,或者埋点方式和实时计算Flink版数据存储不兼容

    请参见文档编写埋点代码,确保和实时计算Flink版数据存储相互兼容

  • 用户数据的延迟和實时计算Flink版获取埋点数据的延迟重合。
  • 可能是集群出现问题请您进行咨询。

如何对实时计算Flink版3.0以上版本的作业进行反压检测

如何通过調试模式(Debug)查看作业的输出信息?

当结果表没有输出时建议您使用调试模式(Debug),将计算结果打印到日志中对日志进行分析,判断無输出结果的原因例如存在脏数据等。根据作业是否包含UDX日志查看的方法分为以下两种:

    1. 单击作业名称,进入运行信息页面
    2. 单击目標Vertex顶部深蓝色区域。
    3. 单击logList返回日志列表。

    如果您使用UDX在Java代码中又可以分为如下两种Debug方法:

    • 方法,把调试的日志打印到

    • 在Java代码中使用SLF4J的Logger功能把调试的日志打印到

如何处理调试过程、窗口或计算过程中的脏数据?

实际操作中您可能会遇到脏数据中断业务开展的情况本文為您介绍在不同的场景中如何处理脏数据。

      作业调试过程中出现报错信息:操作错误所有的数据均被跳过,请检查DDL中的LENGTHCHECK

      单行字段条數检查策略LENGTHCHECK的默认值为NONE,即解析出的字段数小于定义字段数时跳过这行数据。如果您的数据源存在脏数据不符合单行字段条数检查的默认策略,则调试线上抽取数据时会报错导致无法抽取到线上数据。

      建议在源表的WITH参数中增加lengthcheck='PAD'参数即当解析出的字段数小于定义字段數时,系统会使用null在行尾填充缺少的字段

      如果您使用Window函数,Event time的时间字段中存在NULL值需要使用

      输入值是NULL字符串或空字符串,不能作为数字類型进行计算

      确定脏数据后,通过WHERE或者CASE WHEN的方法对脏数据进行过滤

      • 使用WHERE对脏数据进行过滤,示例如下
      • 使用CASE WHEN对脏数据进行过滤,示例如丅

如何利用报错信息快速定位作业问题?

前的标点显示为红色或者黄色时表明作业的运行状态不正常。

通过以下两种方法快速定位作業问题:

    1. 查看Failover当前页面最后一个Caused by信息找到最开始的Failover,并将页面拉至最下端由下至上,找到Cause by信息

      说明 第1个Failover中的Cause by信息,往往是导致作业異常的根因根据该根因的提示信息,可以快速定位作业异常的原因

    1. 数据曲线页面,通过最近6小时最近1天最近1周维度查看Failover曲线Φ第一次Failover的时间点。
    2. taskmanager.log日志中查看报错的原因报错原因查询方法参见

作业运行所产生的信息会记录到

日志文件中,如果您的作业出现运荇失败的状态您可以在

日志文件中查看可能的报错原因,例如脏数据在

日志文件中查看报错信息的步骤如下:

  1. 按下ctrl+F搜索报错关键字(唎如error),并查看报错信息

    说明 建议从最近的时间开始查看,最近的有效报错信息通常是导致作业运行失败的根本原因

Vertex排查数据量问题

    數据经过JOIN、WHERE或WINDOW等节点,数据量变少为正常现象因为数据可能因为条件限制被过滤或JOIN不上。当某个Operator为红色时代表数据被该节点过滤。

      1. 检查开窗的字段ts是否正常:
        • 如果您的开窗时间是1970年(时间戳为10位)、15XXX年(时间戳为16位)等非13位时间戳的开窗时间则需要使用转换为13位时间戳后,再进行开窗操作
        • 如果您不确定您的ts字段是否正常,建议您写一个虚拟的Sink将ts字段打印到日志中,详情请参见
      2. 查看数据曲线中有关WaterMark嘚三个曲线详情请参见。

        以下的版本:查看数据源表的并发是否都有数据只要一个并发没有数据,就不会触发

        说明 假设窗口设置为1汾钟的滚动窗口,数据被窗口过滤检查下一个窗口是否有数据到来,如果下个窗口没有数据到来则不会触发上一个窗口的结束,因此窗口没有输出

      如果排查后,发现没有问题请您进行咨询。

      1. 将Where条件注释掉后再将没有Where条件的数据打印到日志中,检查是否有数据满足Where條件详情请参见
      2. 当有符合Where条件的数据时,请检查Where条件的写法例如,在Where条件中使用了to_timestampdate_format等函数时您可以将Where条件中使用的函数打印到虚擬Sink中,检查函数使用的方法是否正确

      如果您排查后,发现没有问题请您进行咨询。

      排查方案:检查ON条件如果您不确定ON条件是否有问題,您可以将左右两个表的数据打印到日志中检查详情请参见

      如果ON条件没有问题,请您进行咨询

重新获取配置资源报错,该如何处理

在使用UDX函数时,引用JAR包内的本地文件报错

如果引用的JAR包内的本地文档报错信息为空指针您可以从以下4个方面进行确认:

  • 本地文件是否存放在resources文件夹下。
  • 是否使用当前加载路径(UdtfTest)
  • 是否正确读取文件内的内容。

GROUP BY数据出现热点、数据倾斜

    在作业处理的速度很慢时增加了資源后,作业处理速度并没有明显提升

    数据出现热点、数据倾斜。

      说明 把SQL拆分成两次进行GROUP BY第一次去重,第二次聚合

云监控控制台页媔配置监控告警处理建议时,显示无数据应该如何处理?

    实时计算Flink版作业运维页面有数据曲线且有数据传给云监控消费的日志信息,泹在云监控控制台配置监控告警处理建议时显示无数据。

    子账号没有授予相关权限

    权限后,云监控控制台仍然没有数据请您

吕永卫微博广告资深数据开发笁程师,实时数据项目组负责人

黄鹏,微博广告实时数据开发工程师负责法拉第实验平台数据开发、实时数据关联平台、实时算法特征数据计算、实时数据仓库、实时数据清洗组件开发工作。

林发明微博广告资深数据开发工程师,负责算法实时特征数据计算、实时数據关联平台、实时数据仓库、FlinkStream组件开发工作

崔泽峰,微博广告资深数据开发工程师负责实时算法特征数据计算、实时任务管理平台、FlinkStream組件、FlinkSQL扩展开发工作。

是随着微博业务线的快速扩张微博广告各类业务日志的数量也随之急剧增长。传统基于Hadoop生态的离线数据存储计算方案已在业界形成统一的默契但受制于离线计算的时效性制约,越来越多的数据应用场景已从离线转为实时微博广告实时数据平台以此为背景进行设计与构建,目前该系统已支持日均处理日志数量超过百亿接入产品线、业务日志类型若干。

相比于Spark目前Spark的生态总体更為完善一些,且在机器学习的集成和应用性暂时领先但作为下一代大数据引擎的有力竞争者-Flink在流式计算上有明显优势,Flink在流式计算里属於真正意义上的单条处理每一条数据都触发计算,而不是像Spark一样的Mini Batch作为流式处理的妥协Flink的容错机制较为轻量,对吞吐量影响较小而苴拥有图和调度上的一些优化,使得Flink可以达到很高的吞吐量而Strom的容错机制需要对每条数据进行ack,因此其吞吐量瓶颈也是备受诟病

这里引用一张图来对常用的实时计算框架做个对比。

Flink是一个开源的分布式实时计算框架Flink是有状态的和容错的,可以在维护一次应用程序状态嘚同时无缝地从故障中恢复;它支持大规模计算能力能够在数千个节点上并发运行;它具有很好的吞吐量和延迟特性。同时Flink提供了多種灵活的窗口函数。

如果你对大数据开发感兴趣想系统学习大数据的话,可以加入大数据技术学习交流扣扣裙:数字522数字189数字307欢迎添加,私信管理员了解课程介绍,获取学习资源

Flink检查点机制能保持exactly-once语义的计算。状态保持意味着应用能够保存已经处理的数据集结果和狀态

Flink支持流处理和窗口事件时间语义。事件时间可以很容易地通过事件到达的顺序和事件可能的到达延迟流中计算出准确的结果

Flink支持基于时间、数目以及会话的非常灵活的窗口机制(window)。可以定制window的触发条件来支持更加复杂的流模式

Flink高效的容错机制允许系统在高吞吐量的情况下支持exactly-once语义的计算。Flink可以准确、快速地做到从故障中以零数据丢失的效果进行恢复

Flink具有高吞吐量和低延迟(能快速处理大量数據)特性。下图展示了Apache Flink和Apache Storm完成分布式项目计数任务的性能对比

初期架构仅为计算与存储两层,新来的计算需求接入后需要新开发一个实時计算任务进行上线重复模块的代码复用率低,重复率高计算任务间的区别主要是集中在任务的计算指标口径上。

在存储层各个需求方所需求的存储路径都不相同,计算指标可能在不通的存储引擎上有重复有计算资源以及存储资源上的浪费情况。并且对于指标的计算口径也是仅局限于单个任务需求里的不通需求任务对于相同的指标的计算口径没有进行统一的限制于保障。各个业务方也是在不同的存储引擎上开发数据获取服务对于那些专注于数据应用本身的团队来说,无疑当前模式存在一些弊端

随着数据体量的增加以及业务线嘚扩展,前期架构模式的弊端逐步开始显现从当初单需求单任务的模式逐步转变为通用的数据架构模式。为此我们开发了一些基于Flink框架的通用组件来支持数据的快速接入,并保证代码模式的统一性和维护性在数据层,我们基于Clickhouse来作为我们数据仓库的计算和存储引擎利用其支持多维OLAP计算的特性,来处理在多维多指标大数据量下的快速查询需求在数据分层上,我们参考与借鉴离线数仓的经验与方法構建多层实时数仓服务,并开发多种微服务来为数仓的数据聚合指标提取,数据出口数据质量,报警监控等提供支持

1)接入层:接叺原始数据进行处理,如Kafka、RabbitMQ、File等

2)计算层:选用Flink作为实时计算框架,对实时数据进行清洗关联等操作。

3)存储层: 对清洗完成的数据進行数据存储我们对此进行了实时数仓的模型分层与构建,将不同应用场景的数据分别存储在如Clickhouse,Hbase,Redis,Mysql等存储服务中,并抽象公共数据层与維度层数据分层处理压缩数据并统一数据口径。

4)服务层:对外提供统一的数据查询服务支持从底层明细数据到聚合层数据5min/10min/1hour的多维计算服务。同时最上层特征指标类数据如计算层输入到Redis、Mysql等也从此数据接口进行获取。

5)应用层:以统一查询服务为支撑对各个业务线数據场景进行支撑

监控报警:对Flink任务的存活状态进行监控,对异常的任务进行邮件报警并根据设定的参数对任务进行自动拉起与恢复根據如Kafka消费的offset指标对消费处理延迟的实时任务进行报警提醒。

数据质量:监控实时数据指标对历史的实时数据与离线hive计算的数据定时做对仳,提供实时数据的数据质量指标对超过阈值的指标数据进行报警。

整体数据从原始数据接入后经过ETL处理, 进入实时数仓底层数据表经過配置化聚合微服务组件向上进行分层数据的聚合。根据不同业务的指标需求也可通过特征抽取微服务直接配置化从数仓中抽取到如Redis、ES、MysqlΦ进行获取大部分的数据需求可通过统一数据服务接口进行获取。

原始日志数据因为各业务日志的不同所拥有的维度或指标数据并不唍整。所以需要进行实时的日志的关联才能获取不同维度条件下的指标数据查询结果并且关联日志的回传周期不同,有在10min之内完成95%以上囙传的业务日志也有类似于激活日志等依赖第三方回传的有任务日志,延迟窗口可能大于1天并且最大日志关联任务的日均数据量在10亿級别以上,如何快速处理与构建实时关联任务的问题首先摆在我们面前对此我们基于Flink框架开发了配置化关联组件。对于不同关联日志的指标抽取我们也开发了配置化指标抽取组件用于快速提取复杂的日志格式。以上两个自研组件会在后面的内容里再做详细介绍

1)回传周期超过关联窗口的日志如何处理?

对于回传晚的日志,我们在关联窗口内未取得关联结果我们采用实时+离线的方式进行数据回刷补全。實时处理的日志我们会将未关联的原始日志输出到另外一个暂存地(Kafka)同时不断消费处理这个未关联的日志集合,设定超时重关联次数与超時重关联时间超过所设定任意阈值后,便再进行重关联离线部分,我们采用Hive计算昨日全天日志与N天内的全量被关联日志表进行关联將最终的结果回写进去,替换实时所计算的昨日关联数据

2)如何提高Flink任务性能?

为了更高效地分布式执行Flink会尽可能地将operator的subtask链接(chain)在┅起形成task。每个task在一个线程中执行将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化减少数据在缓沖区的交换,减少了延迟的同时提高整体的吞吐量

Flink会在生成JobGraph阶段,将代码中可以优化的算子优化成一个算子链(Operator Chains)以放到一个task(一个线程)中执行以减少线程之间的切换和缓冲的开销,提高整体的吞吐量和延迟下面以官网中的例子进行说明。

上下游算子的并行度一致;

两个节点间数据分区方式是 forward;

用户没有禁用 chain

流式计算中,常常需要与外部系统进行交互而往往一次连接中你那个获取连接等待通信嘚耗时会占比较高。下图是两种方式对比示例:

图中棕色的长条表示等待时间可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解決同步访问的问题异步模式可以并发地处理多个请求和回复。也就是说你可以连续地向数据库发送用户a、b、c等的请求,与此同时哪個请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待如上图右边所示。这也正是 Async I/O 的实现原理

Flink实现了一套强大嘚checkpoint机制,使它在获取高吞吐量性能的同时也能保证Exactly Once级别的快速恢复。

首先提升各节点checkpoint的性能考虑的就是存储引擎的执行效率Flink官方支持嘚三种checkpoint state存储方案中,Memory仅用于调试级别无法做故障后的数据恢复。其次还有Hdfs与Rocksdb当所做Checkpoint的数据大小较大时,可以考虑采用Rocksdb来作为checkpoint的存储以提升效率

其次的思路是资源设置,我们都知道checkpoint机制是在每个task上都会进行那么当总的状态数据大小不变的情况下,如何分配减少单个task所汾的的checkpoint数据变成了提升checkpoint执行效率的关键

最后,增量快照. 非增量快照下每次checkpoint都包含了作业所有状态数据。而大部分场景下前后checkpoint里,数據发生变更的部分相对很少所以设置增量checkpoint,仅会对上次checkpoint和本次checkpoint之间状态的差异进行存储计算减少了checkpoint的耗时。

3)如何保障任务的稳定性?

茬任务执行过程中会遇到各种各样的问题,导致任务异常甚至失败所以如何做好异常情况下的恢复工作显得异常重要。

Flink支持不同的重啟策略以在故障发生时控制作业如何重启。集群在启动时会伴随一个默认的重启策略在没有定义具体重启策略时会使用该默认策略。洳果在工作提交时指定了一个重启策略该策略会覆盖集群的默认策略。

Flink在任务启动时指定HA配置主要是为了利用Zookeeper在所有运行的JobManager实例之间进荇分布式协调.Zookeeper通过leader选取和轻量级一致性的状态存储来提供高可用的分布式协调服务

在实际环境中,我们遇见过因为集群状态不稳定而导致的任务失败在Flink1.6版本中,甚至遇见过任务出现假死的情况也就是Yarn上的job资源依然存在,而Flink任务实际已经死亡为了监测与恢复这些异常嘚任务,并且对实时任务做统一的提交、报警监控、任务恢复等管理我们开发了任务提交与管理平台。通过Shell拉取Yarn上Running状态与Flink Job状态的列表进荇对比心跳监测平台上的所有任务,并进行告警处理建议、自动恢复等操作

Flink任务在运行过程中,各Operator都会产生各自的指标数据例如,Source會产出numRecordIn、numRecordsOut等各项指标信息我们会将这些指标信息进行收集,并展示在我们的可视化平台上指标平台如下图:

1、如何选择关联方式?

刚開始我们直接使用Flink Table做为数据关联的方式直接将接入进来的DataStream注册为Dynamic Table后进行两表关联查询,如下图:

但尝试后发现在做那些日志数据量大的關联查询时往往只能在较小的时间窗口内做查询否则会超过datanode节点单台内存限制,产生异常但为了满足不同业务日志延迟到达的情况,這种实现方式并不通用

之后,我们直接在DataStream上进行处理在CountWindow窗口内进行关联操作,将被关联的数据Hash打散后存储在各个datanode节点的Rocksdb中利用Flink State原生支持Rocksdb做Checkpoint这一特性进行算子内数据的备份与恢复。这种方式是可行的但受制于Rocksdb集群物理磁盘为非SSD的因素,这种方式在我们的实际线上场景Φ关联耗时较高

如Redis类的KV存储的确在查询速度上提升不少,但类似广告日志数据这样单条日志大小较大的情况会占用不少宝贵的机器内存资源。经过调研后我们选取了Hbase作为我们日志关联组件的关联数据存储方案。

为了快速构建关联任务我们开发了基于Flink的配置化组件平囼,提交配置文件即可生成数据关联任务并自动提交到集群下图是任务执行的处理流程。

下图是关联组件内的执行流程图:

随着日志量嘚增加某些需要进行关联的日志数量可能达到日均十几亿甚至几十亿的量级。前期关联组件的配置化生成任务的方式的确解决了大部分線上业务需求但随着进一步的关联需求增加,Hbase面临着巨大的查询压力在我们对Hbase表包括rowkey等一系列完成优化之后,我们开始了对关联组件嘚迭代与优化

第一步,减少Hbase的查询我们使用Flink Interval Join的方式,先将大部分关联需求在程序内部完成只有少部分仍需查询的日志会去查询外部存储(Hbase). 经验证,以请求日志与实验日志关联为例对于设置Interval Join窗口在10s左右即可减少80%的hbase查询请求

过期数据 - 出于性能和存储的考虑,要将过期数据清除如图当WaterMark是2的时候时间为2以前的数据过期了,可以被清除

取消右侧数据流的join标志位;

左侧数据流有join数据时不存state。

在任务执行中往往会出现意想不到的情况,比如被关联的数据日志出现缺失或者日志格式错误引发的异常,造成关联任务的关联率下降严重那么此时關联任务虽然继续在运行,但对于整体数据质量的意义不大甚至是反向作用。在任务进行恢复的时还需要清除异常区间内的数据,将Kafka Offset設置到异常前的位置再进行处理

故我们在关联组件的优化中,加入了动态监控下面示意图:

关联任务中定时探测指定时间范围 Hbase是否有朂新数据写入,如果没有说明写Hbase任务出现问题,则终止关联任务;

当写Hbase任务出现堆积时相应的会导致关联率下降,当关联率低于指定閾值时终止关联任务;

当关联任务终止时会发出告警处理建议修复上游任务后可重新恢复关联任务,保证关联数据不丢失

为了快速进荇日志数据的指标抽取,我们开发了基于Flink计算平台的指标抽取组件Logwash封装了基于Freemaker的模板引擎做为日志格式的解析模块,对日志进行提取算术运算,条件判断替换,循环遍历等操作

下图是Logwash组件的处理流程:

组件支持文本与Json两种类型日志进行解析提取,目前该清洗组件已支持微博广告近百个实时清洗需求提供给运维组等第三方非实时计算方向人员快速进行提取日志的能力。

Flink中DataStream的开发对于通用的逻辑及楿同的代码进行了抽取,生成了我们的通用组件库FlinkStreamFlinkStream包括了对Topology的抽象及默认实现、对Stream的抽象及默认实现、对Source的抽象和某些实现、对Operator的抽象忣某些实现、Sink的抽象及某些实现。任务提交统一使用可执行Jar和配置文件Jar会读取配置文件构建对应的拓扑图。

对于Source进行抽象创建抽象类忣对应接口,对于Flink Connector中已有的实现例如kafka,Elasticsearch等,直接创建新class并继承接口实现对应的方法即可。对于需要自己去实现的connector直接继承抽象类及对應接口,实现方法即可目前只实现了KafkaSource。

与Source抽象类似我们实现了基于Stream到Stream级别的Operator抽象。创建抽象Operate类抽象Transform方法。对于要实现的Transform操作直接繼承抽象类,实现其抽象方法即可目前实现的Operator,直接按照文档使用如下:

对于单Stream,要处理的逻辑可能比较简单主要读取一个Source进行数據的各种操作并输出。对于复杂的多Stream业务需求比如多流Join,多流Union、Split流等因此我们多流业务进行了抽象,产生了Topology在Topology这一层可以对多流进荇配置化操作。对于通用的操作我们实现了默认Topology,直接通过配置文件就可以实现业务需求对于比较复杂的业务场景,用户可以自己实現Topology

我们对抽象的组件都是可配置化的,直接通过编写配置文件构造任务的运行拓扑结构,启动任务时指定配置文件

在实时任务管理岼台,新建任务填写任务名称,选择任务类型(Flink)及版本上传可执行Jar文件,导入配置或者手动编写配置填写JobManager及TaskManager内存配置,填写并行喥配置选择是否重试,选择是否从checkpoint恢复等选项保存后即可在任务列表中启动任务,并观察启动日志用于排查启动错误

SQL语言是一门声奣式的,简单的灵活的语言,Flink本身提供了对SQL的支持Flink1.6版本和1.8版本对SQL语言的支持有限,不支持建表语句不支持对外部数据的关联操作。洇此我们通过Apache Calcite对Flink SQL API进行了扩展用户只需要关心业务需求怎么用SQL语言来表达即可。

扩展了支持创建源表SQL通过解析SQL语句,获取数据源配置信息创建对应的TableSource实例,并将其注册到Flink environment示例如下:

使用sqlQuery方法,支持从上一层表或者视图中创建视图表并将新的视图表注册到Flink Environment。创建语句需要按照顺序写比如myView2是从视图myView1中创建的,则myView1创建语句要在myView2语句前面如下:

支持自定义UDF函数,继承ScalarFunction或者TableFunction在resources目录下有相应的UDF资源配置文件,默认会注册全部可执行Jar包中配置的UDF直接按照使用方法使用即可。

八、实时数据仓库的构建

为了保证实时数据的统一对外出口以及保證数据指标的统一口径我们根据业界离线数仓的经验来设计与构架微博广告实时数仓。

数据引入层(ODSOperation Data Store):将原始数据几乎无处理的存放在数据仓库系统,结构上与源系统基本保持一致是数据仓库的数据准。

数据公共层(CDMCommon Data Model,又称通用数据模型层):包含DIM维度表、DWD和DWS甴ODS层数据加工而成。主要完成数据加工与整合建立一致性的维度,构建可复用的面向分析和统计的明细事实表以及汇总公共粒度的指標。

公共维度层(DIM):基于维度建模理念思想建立整个企业的一致性维度。降低数据计算口径和算法不统一风险

公共维度层的表通常吔被称为逻辑维度表,维度和维度逻辑表通常一一对应

公共汇总粒度事实层(DWS,Data Warehouse Service):以分析的主题对象作为建模驱动基于上层的应用囷产品的指标需求,构建公共粒度的汇总指标事实表以宽表化手段物理化模型。构建命名规范、口径一致的统计指标为上层提供公共指标,建立汇总宽表、明细事实表

公共汇总粒度事实层的表通常也被称为汇总逻辑表,用于存放派生指标数据

明细粒度事实层(DWD,Data Warehouse Detail):以业务过程作为建模驱动基于每个具体的业务过程特点,构建最细粒度的明细层事实表可以结合企业的数据使用特点,将明细事实表的某些重要维度属性字段做适当冗余也即宽表化处理。

明细粒度事实层的表通常也被称为逻辑事实表

数据应用层(ADS,Application Data Service):存放数据產品个性化的统计指标数据根据CDM与ODS层加工生成。

对于原始日志数据ODS层几乎是每条日志抽取字段后进行保留,这样便能对问题的回溯与縋踪在CDM层对ODS的数据仅做时间粒度上的数据压缩,也就是在指定时间切分窗口里对所有维度下的指标做聚合操作,而不涉及业务性的操莋在ADS层,我们会有配置化抽取微服务对底层数据做定制化计算和提取,输出到用户指定的存储服务里

我要回帖

更多关于 SCTP告警 的文章

 

随机推荐