Flink的Transformation转换转换主要包括四种类型:单数据流基本转换、基于键的分组转换、多数据流转换和数据重分布转换。读者可以用Flink Scala Shell或Intellij Idea进行练习:
Flink Scala Shell:通过使用交互式编程环境学习和调试Flink link 01 |十分钟构建第一个Flink应用程序和本地集群Flink运算符使用方法和示例演示:map、filter和flatMapFlink运算符使用方法和示例演示:keyBy,使用reduce和aggregationsFlink运算符的方法和示例演示union和connect
并行度
Flink使用并行性定义一个运算符分为多少个运算符子任务。我们编写的大多数转换操作都可以形成一个逻辑视图。在实际运行时,逻辑视图中的运算符将被并行划分为一个或多个运算符子任务,每个运算符子任务将处理一部分数据。如下图所示,每个操作符在多个子任务上并行执行。如果运算符的并行度是2,那么它有两个实例。
Flink并行执行示意图
并行度可以在Flink作业的执行环境级别统一设置,这会设置作业所有操作符的并行度,也可以单独设置一个操作符的并行度。在没有任何设置的情况下,默认情况下,作业的所有操作符的并行度将取决于作业的执行环境。如果作业在本地执行,并行度默认为本机CPU内核的数量。当我们向Flink集群提交作业时,我们需要使用提交作业的客户端并指定一系列参数,其中之一就是并行性。
下面的代码显示了如何获取执行环境的默认并行度,以及如何更改执行环境的并行度。
val senv : stream executionenvironment=stream executionenvironment . getexecutionenvironment
//获取当前执行环境的默认并行度
val DefaultParallelism=senv . GetParallelism
//将所有运算符的并行度设置为4,表示所有运算符并行执行的实例数为4。
设置并行性(4)您也可以为操作员设置并行性:
数据流. map(新的mymapper)。设置并行度(默认并行度* 2)
数据重分布
默认情况下,数据会自动分布到多个实例。有时,我们需要在多个实例上手动分发数据。例如,我们知道一个实例中有太多的数据,而其他实例中有稀疏的数据,这会导致数据不对称。此时,我们需要将数据平均分配给所有实例,以避免一些实例过载。数据倾斜的问题会导致整个作业的计算时间过长或内存不足等问题。
下面提到的每个数据再分配运算符的输入都是DataStream,输出也是DataStream。keyBy也有分组和重新分配数据的功能,但是KeyBy输出的是KeyedStream。
洗牌
Shuffle基于正态分布将数据随机分配给下游操作员。
DataStream.shuffle()重新平衡和重新缩放
重新平衡使用Round-ribon的思想将数据平均分配给每个实例。Round-ribon是一种均匀分布方法,常用于负载平衡领域。上游数据将以轮询方式分发到所有下游实例。如下图所示,上游操作员将依次向所有下游操作员实例发送数据。
重新平衡将数据轮询到下游实例。
datastream . rechare()reshare与rechare类似,它将数据平均分配给所有下游实例,但其传输成本较小,因为reshare不会以轮询方式将每个数据发送给每个下游实例,而是发送给附近的下游实例。
当有两个上游实例和四个下游实例时,将执行重新缩放。
如上图所示,当上游有两个实例时,上游的第一个实例向下游的第一个和第二个实例发送数据,上游的第二个实例向下游的第三个和第四个实例发送数据。与重新平衡向下游的每个实例发送数据相比,重新缩放的传输开销更小。下图显示,当上游有四个实例时,上游的前两个实例向下游的第一个实例发送数据,上游的后两个实例向下游的第二个实例发送数据。
6.toutiaoimg.com/origin/pgc-image/8d3c3aec2b874b03ab68e6f3dab2db85?from=pc”>
broadcast
英文单词”broadcast”翻译过来为广播,在Flink里,数据会被复制并广播发送给下游的所有实例上。
dataStream.broadcast()
global
global会所有数据发送给下游算子的第一个实例上,使用这个算子时要小心,以免造成严重的性能问题。
dataStream.global()
partitionCustom
我们也可以使用partitionCustom来自定义数据重分布逻辑。partitionCustom有两个参数:第一个参数是自定义的Partitioner,我们需要重写里面的partition函数;第二个参数是对数据流哪个字段使用partiton逻辑。partition函数的返回一个整数,表示该元素将被路由到下游第几个实例。
Partitioner[T]中泛型T为指定的字段类型,比如我们要对case class (id: Long, name: String, score: Double)这个数据结构按照id均匀分配到下游各实例,那么泛型T就为id的数据类型Long。同时,泛型T也是partition(key, numPartitions)函数的第一个参数的数据类型。在调用partitionCustom(partitioner, field)时,第一个参数是我们重写的Partitioner,第二个参数表示按照id字段进行处理。
下面的代码按照数据流中的第二个字段进行数据重分布,当该字段中包含数字时,将被路由到下游算子的前半部分,否则被路由到后半部分。如果设置并行度为4,表示所有算子的实例数为4,或者说有4个分区,那么如果字符串包含数字时,该元素将被分配到第0个和第1个实例上,否则被分配到第2个和第3个实例上。
package com.flink.tutorials.api.transformations
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._
object PartitionCustomExample {
/**
* Partitioner[T] 其中泛型T为指定的字段类型
* 重写partiton函数,并根据T字段对数据流中的所有元素进行数据重分配
* */
class MyPartitioner extends Partitioner[String] {
val rand = scala.util.Random
/**
* key 泛型T 即根据哪个字段进行数据重分配,本例中是(Int, String)中的String
* numPartitons 为当前有多少个并行实例
* 函数返回值是一个Int 为该元素将被发送给下游第几个实例
* */
override def partition(key: String, numPartitions: Int): Int = {
var randomNum = rand.nextInt(numPartitions / 2)
// 如果字符串中包含数字,该元素将被路由到前半部分,否则将被路由到后半部分。
if (key.exists(_.isDigit)) {
return randomNum
} else {
return randomNum + numPartitions / 2
}
}
}
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 获取当前执行环境的默认并行度
val defaultParalleism = senv.getParallelism
// 设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4
senv.setParallelism(4)
val dataStream: DataStream[(Int, String)] = senv.fromElements((1, “123”), (2, “abc”), (3, “256”), (4, “zyx”)
, (5, “bcd”), (6, “666”))
// 对(Int, String)中的第二个字段使用 MyPartitioner 中的重分布逻辑
val partitioned = dataStream.partitionCustom(new MyPartitioner, 1)
partitioned.print()
senv.execute(“partition custom transformation”)
}
}