希望长大对我而言,是可以做更多想做的事,而不是被迫做更多不想做的事...... 首页 Flink多流转换 丁D 学无止境 2024-09-02 10:48 11687已阅读 Flink 摘要之前我们学习的都是针对一条流的数据进行操作的。但是在实际应用中,可能需要将不同来源(数据源不同)的数据连接在一起进行处理。 也可能将一条流的数据拆成多条流进行处理。所以简单划分多流转换,可以分为 “分流”和“合流”。 # 多流转换 之前我们学习的都是针对一条流的数据进行操作的。但是在实际应用中,可能需要将不同来源(数据源不同)的数据连接在一起进行处理。也可能将一条流的数据拆成多条流进行处理。所以简单划分多流转换,可以分为 “分流”和“合流”。 分流一般通过侧输出流来实现合流就比较多了 - 联合union - 连接connect - 联结join ### 分流(侧输出流) 分流其实最简单的是filter,但是filter代码显得有些冗余,需要拆出几条流就要重复写几次,所以一般不用这种,用侧输出流 ``` // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp) private static OutputTag> MaryTag = new OutputTag>("Mary-pv"){}; //开始分流到标签 stream.process(new ProcessFunction() { @Override public void processElement(Event value, Context ctx, Collector out) throws Exception { if (value.user.equals("Mary")){ ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp)); } else { out.collect(value); } } }); //获取侧输出流 processedStream.getSideOutput(MaryTag).print("Mary pv"); ``` ### 合流操作 #### 联合union 最简单的合流操作,就是直接将多条流合并在一起,使用union这种操作就叫联合stream1.union(stream2,stream3,...)合并之后 水位线拿最小的。 *注意 union联合要求两个流的数据类型要一样* #### 连接connect 流的联合虽然简单但是要求数据类型要一样,所以在实际场景比较少用,所以flink提供了比较灵活的connect来连接两条流dataStream要求数据类型是一样的唯一的,所以使用connect连接的两条流的数据类型不再是dataStream而是ConnectedStreamsConnectedStreams connectedStreams = stream1.connect(stream2) 连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。 *ConnectedStreams调用map和flatmap入参不再是MapFunction了而是CoMapFunction* CoMapFunction三个参数依次表示第一条流、第二条流,以及合并后的流中的数据类型.map1()就是对第一条流中数据的 map 操作,.map2()则是针对第二条流。 ``` public interface CoMapFunction extends Function, Serializable { OUT map1(IN1 var1) throws Exception; OUT map2(IN2 var1) throws Exception; } ``` 同理CoFlatMapFunction 一样的道理 ``` public interface CoFlatMapFunction extends Function, Serializable { void flatMap1(IN1 var1, Collector var2) throws Exception; void flatMap2(IN2 var1, Collector var2) throws Exception; } ``` connectedStreams.keyBy(keySelector1, keySelector2);两个参数 keySelector1 和 keySelector2,是两条流中各自的键选择器;ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起,然后针对来源的流再做各自处理 处理函数CoProcessFunction。它需要实现的就是 processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。CoProcessFunction 同样可以通过上下文 ctx 来访问 timestamp、水位线,并通过 TimerService 注册定时器; 下面是 CoProcessFunction 的一个具体示例:我们可以实现一个实时对账的需求,也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息。 ``` public class BillCheckExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 来自 app 的支付日志 SingleOutputStreamOperator> appStream = env.fromElements( Tuple3.of("order-1", "app", 1000L), Tuple3.of("order-2", "app", 2000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple3 element, long recordTimestamp) { return element.f2; } }) ); // 来自第三方支付平台的支付日志 SingleOutputStreamOperator> thirdpartStream = env.fromElements( Tuple4.of("order-1", "third-party", "success", 3000L), Tuple4.of("order-3", "third-party", "success", 4000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple4 element, long recordTimestamp) { return element.f3; } }) ); // 检测同一支付单在两条流中是否匹配,不匹配就报警 appStream.connect(thirdpartStream).keyBy(data -> data.f0, data -> data.f0).process(new OrderMatchResult()).print(); env.execute(); } // 自定义实现 CoProcessFunction public static class OrderMatchResult extends CoProcessFunction, Tuple4, String>{ // 定义状态变量,用来保存已经到达的事件 private ValueState> appEventState; private ValueState> thirdPartyEventState; @Override public void open(Configuration parameters) throws Exception { appEventState = getRuntimeContext().getState(new ValueStateDescriptor>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))); thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))); } @Override public void processElement1(Tuple3 value, Context ctx, Collector out) throws Exception { // 看另一条流中事件是否来过 if (thirdPartyEventState.value() != null){ out.collect(" 对 账 成 功 : " + value + " " + thirdPartyEventState.value()); // 清空状态 thirdPartyEventState.clear(); } else { // 更新状态 appEventState.update(value); // 注册一个 5 秒后的定时器,开始等待另一条流的事件 ctx.timerService().registerEventTimeTimer(value.f2 + 5000L); } } @Override public void processElement2(Tuple4 value, Context ctx, Collector out) throws Exception { if (appEventState.value() != null){ out.collect("对账成功:" + appEventState.value() + " " + value); // 清空状态 appEventState.clear(); } else { // 更新状态 thirdPartyEventState.update(value); // 注册一个 5 秒后的定时器,开始等待另一条流的事件 ctx.timerService().registerEventTimeTimer(value.f3 + 5000L); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { // 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来 if (appEventState.value() != null) { out.collect("对账失败:" + appEventState.value() + " " + "第三方支付平台信息未到"); } if (thirdPartyEventState.value() != null) { out.collect("对账失败:" + thirdPartyEventState.value() + " " + "app 信息未到"); } appEventState.clear(); thirdPartyEventState.clear(); } }} } ``` ### 基于时间的合流——双流联结(Join) #### 窗口联结(window join) 基于时间的合流当然最基本的就是时间窗口了,两条流的数据进行合并、且同样针对某段时间进行处理和统计flink专门设计了窗口联结?(window join)。并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理 `.where()`表示stream1的键选择器`.equalTo()`表表示stream2的键选择器 如果相同的key在同一个窗口就会变匹配起来,并可以通过JoinFunction来进行处理.window()可以是滚动窗口、滑动窗口、会话窗口 ``` stream1.join(stream2) .where() .equalTo() .window() .apply() ``` ``` public interface JoinFunction extends Function, Serializable { OUT join(IN1 first, IN2 second) throws Exception; } ``` JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑。 join 的具体处理流程: 两条流的数据到来之后,首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入 JoinFunction 的.join()方法进行计算处理,得到的结果直接输出如图 8-8 所示。所以窗口中每有一对数据成功联结匹配,JoinFunction 的.join()方法就会被调用一次,并输出一个结果。 ***所以join是在窗口结束的时候调用一次*** 在电商网站中,往往需要统计用户不同行为之间的转化,这就需要对不同的行为数据流,按照用户 ID 进行分组后再合并,以分析它们之间的关联。如果这些是以固定时间周期(比如1 小时)来统计的,那我们就可以使用窗口 join 来实现这样的需求。 ``` //省略 数据源时间事件、水印的配置 //r -> r.f0为用户id stream1.join(stream2).where(r -> r.f0).equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction, Tuple2, String>() { @Override public String join(Tuple2 left, Tuple2 right) throws Exception { return left + "=>" + right; } }) .print(); ``` #### 间隔联结(Interval join) 在一些场景下,我们需要处理的时间间隔可能不是固定的,比如在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。 间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。 注意: 1、做间隔联结的两条流 A 和 B,也必须基于相同的 key 2、间隔联结目前只支持事件时间语义 3、下界 lowerBound应该小于等于上界 upperBound,两者都可正可负 ``` orderStream.keyBy(data -> data.f0) .intervalJoin(clickStream.keyBy(data -> data.user)) .between(Time.seconds(-5), Time.seconds(10)) .process(new ProcessJoinFunction, Event, String>() { @Override public void processElement(Tuple3 left, Event right, Context ctx, Collector out) throws Exception { out.collect(right + " => " + left); } }).print(); }} ``` ### 窗口同组联结(Window CoGroup) 窗口同组联结跟window join 非常类似也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了 ``` stream1.coGroup(stream2) .where() .equalTo() .window(TumblingEventTimeWindows.of(Time.hours(1))) .apply() ``` CoGroupFunction是直接把收集到的所有数据一次性传入coGroup入参是迭代器,至于要怎样配对完全是自定义的, 不是像Window CoGroup做笛卡尔积。 ``` stream1 .coGroup(stream2) .where(r -> r.f0) .equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new CoGroupFunction, Tuple2, String>() { @Override public void coGroup(Iterable> iter1, Iterable> iter2, Collector collector) throws Exception { collector.collect(iter1 + "=>" + iter2); } }).print() env.execute(); } } 输出结果 [(a,1000), (a,2000)]=>[(a,3000), (a,4000)] [(b,1000), (b,2000)]=>[(b,3000), (b,4000)] ``` 很赞哦! (0) 上一篇:Flink之水位线 目录 点击排行 Elasticsearch6.3.2之x-pack redis哨兵 2019-07-09 22:05 Redis+Twemproxy+HAProxy+Keepalived 2019-07-12 17:20 GC优化策略和相关实践案例 2019-10-10 10:54 JVM垃圾回收器 2019-10-10 10:23 标签云 Java Spring MVC Mybatis Ansible Elasticsearch Redis Hive Docker Kubernetes RocketMQ Jenkins Nginx 友情链接 郑晓博客 佛布朗斯基 凉风有信 南实博客 Rui | 丁D Java研发工程师 生活可以用「没办法」三个字概括。但别人的没办法是「腿长,没办法」、「长得好看,没办法」、「有才华,没办法」。而你的没办法,是真的没办法。 请作者喝咖啡