希望长大对我而言,是可以做更多想做的事,而不是被迫做更多不想做的事...... 首页 Flink之状态编程 丁D 学无止境 2023-06-20 17:45 20226已阅读 摘要本文将从状态的概念入手,详细介绍 Flink 中的状态分类、状态的使用、持久化及状态后端的配置。 ### 一、Flink状态概念 Flink的处理机制核心:有状态的流式计算,那么什么是有状态,什么是无状态呢? 在流式处理中,数据是连续不断的到来和处理的,每个任务在计算的时候,可以基于当前数据直接转换就能得到结果如map,filter(无状态), 也可以是依赖上一个数据才能得到结果,这个时候我们就需要将上一个结果记录下来如sum(有状态)。 下面的几个场景都需要使用流处理的状态功能: 1、数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。 2、检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。 3、对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。 ![](/upload/flink_state.png) 有状态的算子处理流程如下: 1、接收到上游数据 2、通过上下文获取当前状态 3、根据业务逻辑计算,更新状态 4、将处理结果输出给下游 Flink的算子任务,可以设置并行度,从而在不同的slot运行多个实例,我们把这个实例叫成“并行子任务”或者“算子子任务”。 ### 二、状态分类 1、**托管状态(推荐):**由flink统一管理 存储、故障恢复、重组等 2、**原始状态:** 需要我们自定义,一般不用除非托管搞不定 重点介绍托管状态 我们知道 Flink一个算子任务,可以分为多个并行子任务,分配在不同的任务槽(task slot)中运行,而这些slot的计算资源是物理隔离的, 所以flink管理的的状态是在不同的并行子任务是无法共享的,基于这个想法我们可以将状态分为 算子状态和按键状态 **算子状态:**状态的作用在一个并行子任务,也就是一个算子子任务,所有这个子任务处理的数据共享一个状态 **按键状态:**我们的流可以根据keyby进行分组成keyedStream,这个时候同一个key共享一个状态 值得注意的是无论是keyed state还是operator state,他们都是在本地实例上进行维护的,也就是说每一个并行子任务维护着对应的状态 算子子任务之间的状态并不能共享。 算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。 ### 三、状态数据结构 按键状态数据结构分为5种: **1、值状态(ValueState)** **2、列表状态(ListState)** **3、映射状态(MapState)** **4、归约状态(ReducingState)** **5、聚合状态(AggregatingState)** 算子状态数据结构分为3种 **1、列表状态(ListState)** **2、联合列表状态(UnionListState)** **3、广播状态(BroadcastState):** 有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。 状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)。 ``` public void open(Configuration parameters) throws Exception { lastTemperatureValueState = getRuntimeContext() .getState(new ValueStateDescriptor("last-temp", Double.class)); } ``` 一般来说我们在生命周期方法.open()中获取状态对象。但这个变量不应该在 open 中声明——应该在外面直接把它定义为类的属性, 这样就可以在不同的方法中通用了。而在外部又不能直接获取状态,因为编译时是无法拿到运行时上下文的。 所以最终的解决方案就变成了:在外部声明状态对象,在 open 生命周期方法中通过运行时上下文获取状态。 ### 四、状态具体使用demo ``` import dto.SensorReadingDTO; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import util.DateUtil; //如果传感器的温度差大于10度就预警 //使用状态记录上一次的状态 public class Status_1_KeyedState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); env.setParallelism(1); DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 10008); DataStream dataStream = streamSource .map(new MapFunction() { @Override public SensorReadingDTO map(String input) throws Exception { if (StringUtils.isNotBlank(input)) { String[] infoArray = input.split(","); SensorReadingDTO sensorReadingDTO = new SensorReadingDTO(); sensorReadingDTO.setId(infoArray[0]); sensorReadingDTO.setTimestamp(Long.valueOf(infoArray[1]) * 1000); sensorReadingDTO.setTemperature(Double.valueOf(infoArray[2])); sensorReadingDTO.setTimestampStr(DateUtil.format(sensorReadingDTO.getTimestamp())); return sensorReadingDTO; } return null; } }); //使用flatMap 可以输出0,1个或多个,没有超过10度的,就不要输出 //但是使用map 只能输入1个,必须输出一个,所以不符合 DataStream> checkDataStream = dataStream.keyBy(SensorReadingDTO::getId) .flatMap(new MyMapper(Double.valueOf(10))); checkDataStream.print(); env.execute(); } public static class MyMapper extends RichFlatMapFunction> { private ValueState lastTemperatureValueState; private final Double threshold; public MyMapper(Double threshold) { this.threshold = threshold; } @Override public void open(Configuration parameters) throws Exception { lastTemperatureValueState = getRuntimeContext() .getState(new ValueStateDescriptor("last-temp", Double.class)); } @Override public void close() throws Exception { //释放 lastTemperatureValueState.clear(); } @Override public void flatMap(SensorReadingDTO sensorReadingDTO, Collector> out) throws Exception { //第一次 为空记录当前温度 Double lastTemp = lastTemperatureValueState.value(); Double curTemp = sensorReadingDTO.getTemperature(); // 如果不为空,判断是否温差超过阈值,超过则报警 if (lastTemp != null) { if (Math.abs(curTemp - lastTemp) >= threshold) { out.collect(new Tuple3<>(sensorReadingDTO.getId(), lastTemp, curTemp)); } } // 更新保存的"上一次温度" lastTemperatureValueState.update(curTemp); } } } ``` ### 五、状态后端 **1、MemoryStateBackend** 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,而将checkpoint存储在JobManager的内存中 特点:快速、低延迟,但不稳定 **2、FsStateBackend**(默认) 将checkpoint存到远程的持久化文件系统(FileSystem)上,而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上 同时拥有内存级的本地访问速度,和更好的容错保证 **3、RocksDBStateBackend** 将所有状态序列化后,存入本地的RocksDB中存储 很赞哦! (0) 上一篇:Flink之处理函数 下一篇:Flink之基础概念 目录 点击排行 Elasticsearch6.3.2之x-pack redis哨兵 2019-07-09 22:05 Redis+Twemproxy+HAProxy+Keepalived 2019-07-12 17:20 GC优化策略和相关实践案例 2019-10-10 10:54 JVM垃圾回收器 2019-10-10 10:23 标签云 Java Spring MVC Mybatis Ansible Elasticsearch Redis Hive Docker Kubernetes RocketMQ Jenkins Nginx 友情链接 郑晓博客 佛布朗斯基 凉风有信 MarkHoo's Blog 冰洛博客 南实博客 Rui | 丁D Java研发工程师 生活可以用「没办法」三个字概括。但别人的没办法是「腿长,没办法」、「长得好看,没办法」、「有才华,没办法」。而你的没办法,是真的没办法。 请作者喝咖啡