---- Flink四大基石 Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。 Checkpoint 这是Flink最重要的一个特性。 Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。 Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。 Time 除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。 Window 另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。
1.准备环境-env 2.准备数据-source 3.处理数据-transformation 4.输出结果-sink 5.触发执行-execute 其中创建环境可以使用如下3种方式: getExecutionEnvironment * 编码步骤 * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute//如果有 ; /** * Author lanson * Desc * 需求:使用Flink完成WordCount-DataStream * 编码步骤 * 1.准备环境-env * 2.准备数据-source 表达式 * 编码步骤 * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute 需求:使用Flink完成WordCount-DataStream--使用lambda表达式--修改代码使适合在Yarn上运行 * 编码步骤 * 1.准备环境-env * 2.准备数据-source
、SubTask、Parallelism 1.Dataflow:Flink程序在执行的时候会被映射成一个数据流模型 2.Operator:数据流模型中的每一个操作被称作Operator,Operator --类似于Spark中的窄依赖 2.Redistributing 模式: 这种模式会改变数据的分区数;每个一个operator subtask会根据选择transformation把数据发送到不同的目标 Flink执行图(ExecutionGraph) 由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。 为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。 原理介绍 Flink执行executor会自动根据程序代码生成DAG数据流图 Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph ->
---- Flink-Window操作 为什么需要Window 在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。 --用的较多 2.基于时间的滑动窗口sliding-time-window--用的较多 3.基于数量的滚动窗口tumbling-count-window--用的较少 4.基于数量的滑动窗口sliding-count-window --用的较少 注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算 Window的API window和windowAll window 中, Flink提供了很多各种场景用的WindowAssigner: 如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。 evictor--了解 evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行 用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor
在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下: -1.Yarn的资源可以按需使用,提高集群的资源利用率 -2.Yarn的任务有优先级,根据优先级运行作业 -3 在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job 2.同步 scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml run(提交任务) 1.在yarn上启动一个Flink会话,node1上执行以下命令 /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d 说明: 申请2个CPU、1600M内存 # -n 表示申请2个容器,这里指的就是多少个taskmanager # -tm 表示每个TaskManager的内存大小 # -s 表示每个TaskManager 的内存信息 # -ytm 1024 指定taskmanager的内存信息 2.查看UI界面 http://node1:8088/cluster 3.注意: 在之前版本中如果使用的是flink on
目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实现代码 (基于上面参考代码重新写一套) 实现效果 ---- Flink模拟双十一实时大屏统计 需求 在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。 今天我们就做一个最简单的模拟电商统计大屏的小例子, 需求如下: 1.实时计算出当天零点截止到当前时间的销售总额 2.计算出各个分类的销售top3 3.每秒钟更新一次统计结果 数据 首先我们通过自定义source 模拟订单的生成,生成了一个Tuple2,第一个元素是分类,第二个元素表示这个分类下产生的订单金额,金额我们通过随机生成. /** * 自定义数据源实时产生订单数据Tuple2<分类, 金额> */ Exception { Double totalAmount = 0d;//用来记录销售总额 //用大小顶堆来计算TopN //用大顶堆(大的数据在堆顶
---- Flink用武之地 http://www.liaojiayi.com/flink-IoT/ https://flink.apache.org/zh/usecases.html 从很多公司的应用案例发现 ,其实Flink主要用在如下三大场景: Event-driven Applications【事件驱动】 事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算 比如Flink应用凌晨从Recorded Events中读取昨天的数据,然后做周期查询运算,最后将数据写入Database或者HDFS,或者直接将数据生成报表供公司上层领导决策使用。 Periodic ETL:比如每天凌晨周期性的启动一个Flink ETL Job,读取传统数据库中的数据,然后做ETL,最后写入数据库和文件系统。 Data Pipeline:比如启动一个Flink 实时应用,数据源(比如数据库、Kafka)中的数据不断的通过Flink Data Pipeline流入或者追加到数据仓库(数据库或者文件系统),或者Kafka
; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream 答案其实也很明显,(hello, 2)和(world, 2)。 为什么 Flink 知道之前已经处理过一次 hello world,这就是 state 发挥作用了,这里是被称为 keyed state 存储了之前需要统计的数据,所以 Flink 知道 hello 和 consumer 0 落后了 5 条,consumer 1 落后了 8 条,consumer 2 落后了 3 条,根据 Flink 的原理,此处需进行 Map 操作。 去重需要先了解哪些数据来过,哪些数据还没有来,也就是把所有的主键都记录下来,当一条数据到来后,能够看到在主键当中是否存在。 2.窗口计算:比如统计每分钟 Nginx 日志 API 被访问了多少次。
早期, Flink 是做 Batch 计算的,但是在 2014 年, StratoSphere 里面的核心成员孵化出 Flink,同年将 Flink 捐赠 Apache,并在后来成为 Apache 的顶级大数据项目 ,同时 Flink 计算的主流方向被定位为 Streaming, 即用流式计算来做所有大数据的计算,这就是 Flink 技术诞生的背景。 2014 年 Flink 作为主攻流计算的大数据引擎开始在开源大数据行业内崭露头角。 这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持,这就是阿里选择 Flink 的背景和初衷。 、YARN)、云(GCE/EC2)、Kubenetes。
/dev/table/ Flink的Table模块包括 Table API 和 SQL: Table API 是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,非常直观和方便 SQL作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手 Flink Table API 和 SQL 的实现上有80%左右的代码是公用的。 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行; 2. 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划; 3. 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少; 5. 在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能 在Flink 1.9 之前,Flink API 层 一直分为DataStream API
2、Flink介绍 Some of you might have been already using Apache Spark in your day-to-day life and might have Runtime是Flink的核心数据处理引擎,通过JobGraph的形式通过API接收程序。 JobGraph是一个简单的并行数据流,包含一组产生和使用数据流的任务。 上图显示了程序如何转换为数据流。 Flink数据流默认是并行分布的。 对于并行数据处理,Flink分割运算符和流。 操作员分区被称为子任务。 流可以以一对一或重新分布的方式分发数据。 但是对于GroupBy操作,Flink可能需要通过keys重新分配数据才能获得正确的结果: ? Flink为批处理和流数据处理提供API。所以一旦你建立了Flink的环境,它可以容易地托管流和批处理应用程序。事实上,Flink的工作原理是流式处理,并将批处理视为流式处理的特例。
-1.6.1-bin-hadoop27-scala_2.11.tgz (2)cd flink-1.6.1 (3)启动:. /bin/stop-cluster.sh (5)访问web界面 http://hostname:8081 2、Flink StandAlone模式部署和解析 Ⅰ、依赖环境 jdk1.8及以上 taskmanager.numberOfTaskSlots:每台机器可用的cpu数量 parallelism.default:默认情况下任务的并行度 taskmanager.tmp.dirs:taskmanager的临时数据存储目录 /bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d] 附着到一个已存在的flink yarn session . /bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 .
Java的堆内存中/TaskManage节点的内存中 State可以被记录,在失败的情况下数据还可以恢复 Checkpoint: 某一时刻,Flink中所有的Operator的当前State的全局快照, 一般存在磁盘上 表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有Operator的状态 可以理解为Checkpoint是把State数据定时持久化存储了 比如KafkaConsumer 算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取 注意: Flink中的Checkpoint底层使用了Chandy-Lamport algorithm分布式快照算法可以保证数据的在分布式环境下的一致性 )中写入快照数据的时候是异步的(为了提高效率) 2.分布式快照执行时的数据一致性由Chandy-Lamport algorithm分布式快照算法保证! ; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend
---- Flink安装部署 Flink支持多种安装模式 - Local—本地单机模式,学习测试时使用 - Standalone—独立集群模式,Flink自带集群,开发测试环境使用 - StandaloneHA 作业执行完成后,结果将发送回客户端(JobClient) 操作 1.下载安装包 https://archive.apache.org/dist/flink/ 2.上传flink-1.12.0-bin-scala root /export/server/flink-1.12.0 5.改名或创建软链接 mv flink-1.12.0 flink ln -s /export/server/flink-1.12.0 me hello 2.启动Flink本地“集群” /export/server/flink/bin/start-cluster.sh 3.使用jps可以查看到下面两个进程 TaskManagerRunner /flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output
Savepoint VS Checkpoint Savepoint演示 # 启动yarn session /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d # 运行job-会自动执行Checkpoint /export/server/flink/bin/flink run --class cn.itcast.checkpoint.CheckpointDemo01 /root/ckp.jar # 手动创建savepoint--相当于手动做了一次Checkpoint /export/server/flink/bin/flink savepoint 702b872ef80f08854c946a544f2ee1a5 hdfs://node1:8020/flink-checkpoint/savepoint/ # 停止job /export/server/flink/bin/flink cancel 702b872ef80f08854c946a544f2ee1a5 # 重新启动job,手动加载savepoint数据 /export/server/flink/bin/flink run -s hdfs://node1:8020/flink-checkpoint
参数总结 [root@node1 bin]# /export/server/flink/bin/flink --help . /flink <ACTION> [OPTIONS] [ARGUMENTS] The following actions are available: Action "run" compiles /tmp/myresource.zip,hdfs:///$na menode_address/myresource2. /flink- docs-stable/ops/config.html -m,--jobmanager <arg> -j,--jarfile <jarfile> Flink program JAR file.
写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。 搭建高可用 high-availability: zookeeper # 存储 JobManager 的元数据到 HDFS,用来恢复 JobManager 所需的所有元数据 high-availability.storageDir 一旦将 Flink 部署到 YARN 群集 中,它就会显示 Job Manager 的连接详细信息),其中 2 个 Container 启动 TaskManager (-n 2),每个 TaskManager ■ 使用 flink 直接提交任务 bin/flink run -m yarn-cluster -yn 2 . 下一篇博客,我们将学习Flink 运行架构,敬请期待|ू・ω・` ) 如果以上过程中出现了任何的纰漏错误,烦请大佬们指正? 受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?
---- Flink实现订单自动好评 需求 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评,我们今天主要使用Flink的定时器来简单实现这一功能 数据 自定义source模拟生成一些订单数据. ; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; //把订单数据保存到状态中 mapState.put(value.f1, value.f2);//xxx,2020-11-11 00:00:00 || /注册一个定时器在value.f2 + interval时检查是否需要默认好评 ctx.timerService().registerProcessingTimeTimer(value.f2
作者:腾讯云流计算 Oceanus 团队 流计算 Oceanus 简介 流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时 本文将为您详细介绍如何实时获取 CKafka 中的 JSON 格式数据,经过数据抽取、平铺转换后存入 MySQL 中。 ; 2. ' = '2s' -- 批量输出的间隔); 3. Map 中成员采用 ['属性名'] 的方式companyInfo['address'] AS company_addressFROM `kafka_json_source_table`; 新版 Flink
Broadcast State 是 Flink 1.5 引入的新特性。 下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。 开始构建Stream,当调用keyBy 时所依赖的Key 的类型;上面泛型中的各个参数的含义,说明如下: l IN1:表示非Broadcast 的Data Stream 中的数据记录的类型; l IN2 source -1.构建实时数据事件流-自定义随机 <userID, eventTime, eventType, productID> -2.构建配置流-从MySQL <用户id,<姓名,年龄>> 3. ; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import ; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import