希望长大对我而言,是可以做更多想做的事,而不是被迫做更多不想做的事...... 首页 Flink之基础概念 丁D 学无止境 2023-05-05 18:30 22273已阅读 并行度 slot Flink 摘要本文介绍一下Flink一些基本概念并行度、slot及对应的组件 ### 依赖 开发flink应用我们需要引入对应的maven依赖 flink-java、flink-streaming-java,以及 flink-clients(客户端,也可以省略) ``` org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} ``` 在属性中,我们定义了,这指代的是所依赖的 Scala 版本。这有一点 奇怪:Flink 底层是 Java,而且我们也只用 Java API,为什么还会依赖 Scala 呢?这是因为 Flink 的架构中使用了 Akka 来实现底层的分布式通信,而 Akka 是用 Scala 开发的。 ### 编程模型 执行环境 -> source -> 算子逻辑 -> sink 1、创建执行环境 `ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();` flink在1.12版本之前的流处理和批处理提供了两套api,从1.12官方推荐使用DataStream API 然后在提交任务 指定是流处理还是批处理 ``` $ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar 3> (world,1) 2> (hello,1) 4> (flink,1) 2> (hello,2) 2> (hello,3) 1> (java,1) ``` 前面的数据是指本地执行的不同线程,所以是乱序的,代表1~4代表了并行线程是4,并行度4,本地环境默认并行度是运行电脑的cpu个数 ![](/upload/jobmanager_comment.png) Flink组件 client(客户端) jobManager(作业管理器,相当master) taskManager(任务管理器,工作者,相当于worker) jobmanager包含3三个组件 1、jobMaster:处理单独的job,和具体的job一一对应 2、resourceManager注意:这是Flink内置的资源管理器要跟跟其他平台的区分开 3、分发器:提供一个rest接口用来提交应用,并为每个新提交的作业启动一个新的jobmaster “资源”,主要是指TaskManager的任务槽(task slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。 slot是最小的调度单位,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 的数量限制了 TaskManager 能够并行处理的任务数量。 ![](/upload/job_submit.png) 作业提交流程步骤: 1、客户端将程序通过分发器提供的rest接口,提交到jobmanager 2、分发器启动jobmaster,并将作业提交给jobmaster 3、jobmaster将jobGraph解析成可执行的executionGraph,得到所需的资源数量即slot的个数,然后向资源管理器请求资源 4、资源管理器判断当前是否有足够的资源,没有就启动新的taskManager 5、taskManager启动后向资源管理器注册自己的任务槽 6、资源管理器通知taskManager为新的作业提供slots 7、TaskManager 连接到对应的 JobMaster,提供 slots。 8、JobMaster 将需要执行的任务分发给 TaskManager。 9、TaskManager 执行任务,互相之间可以交换数据。 **算子任务** source就是一个算子任务,sink也是,sum,map等都是 **算子子任务** 在 Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask), 这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。 同一个算子子任务只能在不同的slot执行,不同算子的任务可以共享任务槽 所以我们要算这个作业需要多少slot,只需要找到算子任务最大的并行度,即算子子任务的个数 **算子链** 一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通 (forwarding)模式入map、filter、flatMap 等算子都是这种 one-to-one,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。 并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task) 可以合并起来形成算子链一起共享一个slot 为什么这样设计?可以减少线程之间的切换,和基于缓存器的数据交换 ,减少延时,提高吞吐量 **槽位slot** 任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。 slot是最小的调度单位,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 的数量限制了 TaskManager 能够并行处理的任务数量。 设置一个taskManager的slot数量 : taskmanager.numberOfTaskSlots: 8Slot 和并行度确实都跟程序的并行执行有关,但两者是完全不同的概念。 简单来说,taskslot 是 静 态 的 概 念 , 是 指 TaskManager 具 有 的 并 发 执 行 能 力 , 可 以 通 过 参 数taskmanager.numberOfTaskSlots 进行配置; 而并行度(parallelism)是动态概念,也就是TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。 换句话说,并行度如果小于等于集群中可用 slot 的总数,程序是可以正常执行的,因为 slot 不一定要全部占用,有十分力气可以只用八分; 而如果并行度大于可用 slot 总数,导致超出了并行能力上限,那么心有余力不足,程序就只好等待资源管理器分配更多的资源了。 **并行度** 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。 1、代码中设置,算子后面跟上并行度设置,优先级最高 stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); 2、代码中设置,执行环境设置,这样所有的算子并行度都一样,优先级中 env.setParallelism(2); 3、如果代码中没设置,可以在提交作业的时候使用“-p”参数来设置,优先级低于代码设置,高于配置文件 3、配置文件设置,优先级最低 parallelism.default: 2 ### 统计单词demo批处理 ``` package _1wordcount; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * 批处理 单词统计 */ public class WordCount { public static void main(String[] args) throws Exception { //创建执行环境 这里是批处理的执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //流处理执行环境 //StreamExecutionEnvironment env = StreamExecutionEnvironment // .getExecutionEnvironment(); DataSource dataSource = env .readTextFile("D:\\java\\code\\flink\\flinkdemo\\src\\main\\resources\\_1wordcount.txt"); //map 做数据转换,输入1个返回1个,就是做类型转换 //flatMap 打散,平坦,输入1个,可以返回0个、1个、N个,(如下面按空格分隔,返回多个单词) //keyby用于流处理,groupBy用在批处理 //这里返回的是一个元祖是因为groupBy只能返回元祖,不然会报错 DataSet> dataSet = dataSource .flatMap(new WordCountFlatMap()).groupBy(0).sum(1); dataSet.print(); //输出结果 /*(flink,1) (world,1) (hello,3) (java,1)*/ } public static class WordCountFlatMap implements FlatMapFunction> { //input是输入的字符串 //collector 用来输出 public void flatMap(String input, Collector> collector) throws Exception { if (StringUtils.isNotBlank(input)) { for (String word : input.split(" ")) { if (word.length() < 1) { continue; } collector.collect(new Tuple2<>(word, 1L)); } } } } } ``` ### 统计单词demo流处理 ``` package _1wordcount; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 流处理单词统计 */ public class WordCountStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); //使用nc测试 DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 10008); streamSource.setParallelism(1);//设置并行度 /*DataStream里没有reduce和sum这类聚合操作的方法,因为Flink设计中,所有数据必须先分组才能做聚合操作。 先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合)*/ DataStream dataStream = streamSource .flatMap(new FlatMapFunction() { @Override public void flatMap(String input, Collector collector) throws Exception { for (String word : input.split(" ")) { if (word.length() < 1) { continue; } WordWithCount wordWithCount = new WordWithCount(); wordWithCount.setCount(1L); wordWithCount.setWord(word); collector.collect(wordWithCount); } } }).keyBy("word").window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count"); dataStream.print(); //DataStream流式应用需要显示指定execute()方法运行程序,如果不调用则Flink流式程序不会执行 //对于DataSet API输出算子中已经包含了对execute()方法的调用,不需要显式调用execute()方法,否则程序会出异常。 env.execute("streaming word count"); /*3> WordWithCount{word='java', count=1} 5> WordWithCount{word='hello', count=1} 13> WordWithCount{word='flink', count=1} 5> WordWithCount{word='hello', count=1} 5> WordWithCount{word='hello', count=1} 13> WordWithCount{word='flink', count=1} 5> WordWithCount{word='hello', count=2} 13> WordWithCount{word='flink', count=2} 5> WordWithCount{word='hello', count=1} 13> WordWithCount{word='flink', count=1} 3> WordWithCount{word='java', count=1} 4> WordWithCount{word='f', count=1} 5> WordWithCount{word='hello', count=2}*/ } public static class WordWithCount { private String word; private Long count; @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public Long getCount() { return count; } public void setCount(Long count) { this.count = count; } } } ``` 很赞哦! (0) 上一篇:Flink之状态编程 下一篇:Flink之水位线 目录 点击排行 Elasticsearch6.3.2之x-pack redis哨兵 2019-07-09 22:05 Redis+Twemproxy+HAProxy+Keepalived 2019-07-12 17:20 GC优化策略和相关实践案例 2019-10-10 10:54 JVM垃圾回收器 2019-10-10 10:23 标签云 Java Spring MVC Mybatis Ansible Elasticsearch Redis Hive Docker Kubernetes RocketMQ Jenkins Nginx 友情链接 郑晓博客 佛布朗斯基 凉风有信 MarkHoo's Blog 冰洛博客 南实博客 Rui | 丁D Java研发工程师 生活可以用「没办法」三个字概括。但别人的没办法是「腿长,没办法」、「长得好看,没办法」、「有才华,没办法」。而你的没办法,是真的没办法。 请作者喝咖啡