希望长大对我而言,是可以做更多想做的事,而不是被迫做更多不想做的事...... 首页 Flink之水位线 丁D 学无止境 2023-05-18 18:30 18116已阅读 watermark 摘要 **flink时间语义** 1、Event Time:事件创建时间; 2、Ingestion Time:数据进入Flink的时间; 3、Processing Time:执行操作算子的本地系统时间,与机器相关; flink 1.12之前版本默认使用的是Processing Time,后面的版本考虑事件时间更通过就默认使用Event Time 所以系统时间一到就会输出,而如果是watermark使用的是event time所以要等下一条数据到来,然后判断时间是否大于窗口时间才输出 Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间, **Watermark** Watermark是一种衡量Event Time进展的机制。 Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。 数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。 Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。 水位线的时间戳必须单调**递增** 水位线是基于数据的时间戳生成的 通过`dataStream.assignTimestampsAndWatermarks(WatermarkStrategy)`来指定水位线 ``` 没有延迟 WatermarkStrategy.forMonotonousTimestamps() WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)) 设置延迟5s 注意事件时间必须是ms单位的 所以这里*1000 dataStream.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner() { // 抽取时间戳的逻辑 @Override public long extractTimestamp(SensorReadingDTO element, long recordTimestamp) { return element.getTimestamp() * 1000; } })); ``` 这里需要注意的是,乱序流中生成的水位线真正的时间戳,其实是当前最大事件时间戳 – 延迟时间 – 1,这里的单位是毫秒。 为什么要减 1 毫秒呢?我们可以回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳≤t 的数据全部到齐,不会再来了。 如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的; 所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中 **默认周期是200毫秒**,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置 一般大数据场景都是考虑高并发情况,所以一般使用周期性生成Watermark的方式,避免频繁地生成Watermark。 ![](/upload/watermark_demo.png) 我们可以梳理一下事件时间语义下,之前例子中窗口的处理过程: (1)第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去; (2)后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口; (3)11 秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将 11秒的数据保存进去。 由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也没有到关闭时间; (4)之后又有 9 秒数据到来,同样进入[0, 10)窗口中; (5)12 秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了 10秒,所以 [0, 10)窗口应该关闭了。 第一个窗口收集到了所有的 7 个数据,进行处理计算后输出结果,并将窗口关闭销毁; (6)同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20, 30)并将数据保存进去; 遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出结果并关闭。 Flink对于迟到数据有三层保障,先来后到的保障顺序是: 1、 WaterMark => 约等于放宽窗口标准 2、 allowedLateness => 允许迟到(ProcessingTime超时,但是EventTime没超时) 3、 sideOutputLateData => 超过迟到时间,另外捕获,之后可以自己批处理合并先前的数据 allowedLateness 默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。 为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。 简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后, 还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据 默认情况下,如果不指定allowedLateness,其值是0,即对于watermark超过end-of-window之后,还有此window的数据到达时,这些数据被删除掉了。 > 注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据, 会buffer起来,直到watermark超过end-of-window + allowedLateness()的时间,窗口的数据及元数据信息才会被删除。 再次计算就是DataFlow模型中的Accumulating(积累)的情况。 很赞哦! (0) 上一篇:Flink之基础概念 下一篇:Flink多流转换 目录 点击排行 Elasticsearch6.3.2之x-pack redis哨兵 2019-07-09 22:05 Redis+Twemproxy+HAProxy+Keepalived 2019-07-12 17:20 GC优化策略和相关实践案例 2019-10-10 10:54 JVM垃圾回收器 2019-10-10 10:23 标签云 Java Spring MVC Mybatis Ansible Elasticsearch Redis Hive Docker Kubernetes RocketMQ Jenkins Nginx 友情链接 郑晓博客 佛布朗斯基 凉风有信 MarkHoo's Blog 冰洛博客 南实博客 Rui | 丁D Java研发工程师 生活可以用「没办法」三个字概括。但别人的没办法是「腿长,没办法」、「长得好看,没办法」、「有才华,没办法」。而你的没办法,是真的没办法。 请作者喝咖啡