加入收藏 | 设为首页 | 会员中心 | 我要投稿 三门峡站长网 (https://www.0398zz.com.cn/)- 云连接、设备管理、智能边缘云、云防火墙、数据加密!
当前位置: 首页 > 大数据 > 正文

硬核!一文学完Flink流计算普通算子

发布时间:2021-06-03 19:12:28 所属栏目:大数据 来源:互联网
导读:直入正题! Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。 所以下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。 DataSet 一、Source算子 1. fromCollection fromCollection:从
直入正题!
Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。
所以下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。
DataSet
一、Source算子
1. fromCollection
fromCollection:从本地集合读取数据
例:
val env = ExecutionEnvironment.getExecutionEnvironment 
val textDataSet: DataSet[String] = env.fromCollection( 
  List("1,张三", "2,李四", "3,王五", "4,赵六") 
2. readTextFile
readTextFile:从文件中读取:
val textDataSet: DataSet[String]  = env.readTextFile("/data/a.txt") 
3. readTextFile:遍历目录
readTextFile可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式:
val parameters = new Configuration 
// recursive.file.enumeration 开启递归 
parameters.setBoolean("recursive.file.enumeration", true) 
val file = env.readTextFile("/data").withParameters(parameters) 
4. readTextFile:读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
压缩方法 文件扩展名 是否可并行读取
DEFLATE .deflate no
GZip .gz .gzip no
Bzip2 .bz2 no
XZ .xz no
val file = env.readTextFile("/data/file.gz") 
二、Transform转换算子
因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此:
val env = ExecutionEnvironment.getExecutionEnvironment 
val textDataSet: DataSet[String] = env.fromCollection( 
  List("张三,1", "李四,2", "王五,3", "张三,4") 

(编辑:三门峡站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读