Flink 学习文档
Flink 文档
CheckPoint
概述:
Checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
开启CheckPoint
默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment
的 enableCheckpointing(n)
来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); // long interval : 保存时间(1000L :每隔一秒进行一次保存)
或者
env.getCheckpointConfig().setCheckpointInterval(10000);
// 源码中启用检查点后,默认保存间隔为500ms,不过该方法已经过时,不建议使用,要启用checkpoint请自己设置时间间隔
@Deprecated
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing() {
checkpointCfg.setCheckpointInterval(500);
return this;
}
// 默认的检查点启用时间间隔为-1,即不启用CheckPoint
/** Periodic checkpoint triggering interval. */
private long checkpointInterval = -1; // disabled
配置CheckPoint
设置CheckpointingMode
精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode)
方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
或者
env.getCheckpointConfig().setCheckpointingMode(CheckpointConfig.DEFAULT_MODE);
// 默认的CheckPoint模式是精确一次
/** The default checkpoint mode: exactly once. */
public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
设置Checkpointing超时时间
checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。
env.getCheckpointConfig().setCheckpointTimeout(10000);
// 默认的保存超时时间为10分钟
/** The default timeout of a checkpoint attempt: 10 minutes. */
public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
设置Checkpoints之间的最小间隔
checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。
注意这个值也意味着并发 checkpoint 的数目是一。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
// 默认两个CheckPoint之间的时间间隔为0,即上一个CheckPoint一结束就开始下一个
/** The default minimum pause to be made between checkpoints: none. */
public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
设置checkpoint 可容忍连续失败次数
checkpoint 可容忍连续失败次数:该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
设置并发 checkpoint 的数目
并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。
该选项不能和 “checkpoints 间的最小时间"同时使用。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 默认同时进行CheckPoint的数量为1个
/** The default limit of concurrently happening checkpoints: one. */
public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
设置checkpoints的保留
externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。Checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
ExternalizedCheckpointCleanup
配置项定义了当作业取消时,对作业 checkpoint 的操作:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
设置当存在较新的savepoint时,作业恢复是否应回退到checkpoint。
env.getCheckpointConfig().setPreferCheckpointForRecovery(false);
// 默认是false,即启用savepoint
/** Determines if a job will fallback to checkpoint when there is a more recent savepoint. **/
private boolean preferCheckpointForRecovery = false;
设置CheckPoint存储位置
env.getCheckpointConfig().setCheckpointStorage(...);
相关的配置选项
更多的属性与默认值能在 conf/flink-conf.yaml
中设置(完整教程请阅读 配置)。
Key | Default | Type | Description |
---|---|---|---|
state.backend.incremental | false | Boolean | Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option. |
state.backend.local-recovery | false | Boolean | This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends. Currently, the MemoryStateBackend does not support local recovery and ignores this option. |
state.checkpoint-storage | (none) | String | The checkpoint storage implementation to be used to checkpoint state. The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory . If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called. Recognized shortcut names are 'jobmanager' and 'filesystem'. |
state.checkpoints.dir | (none) | String | The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). |
state.checkpoints.num-retained | 1 | Integer | The maximum number of completed checkpoints to retain. |
state.savepoints.dir | (none) | String | The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend). |
state.storage.fs.memory-threshold | 20 kb | MemorySize | The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB. |
state.storage.fs.write-buffer-size | 4096 | Integer | The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'state.storage.fs.memory-threshold'. |
taskmanager.state.local.root-dirs | (none) | String | The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignores this option. If not configured it will default to <WORKING_DIR>/localState. The <WORKING_DIR> can be configured via process.taskmanager.working-dir |
确保精确一次(exactly once)
当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink 根据你为应用程序和集群的配置,可以产生以下结果:
- Flink 不会从快照中进行恢复(at most once)
- 没有任何丢失,但是你可能会得到重复冗余的结果(at least once)
- 没有丢失或冗余重复(exactly once)
Flink 通过回退和重新发送 source 数据流从故障中恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着 每一个事件都会影响 Flink 管理的状态精确一次。
Barrier 只有在需要提供精确一次的语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置 CheckpointingMode.AT_LEAST_ONCE
关闭 Barrier 对齐来提高性能。
端到端精确一次
为了实现端到端的精确一次,以便 sources 中的每个事件都仅精确一次对 sinks 生效,必须满足以下条件:
- 你的 sources 必须是可重放的,并且
- 你的 sinks 必须是事务性的(或幂等的)
State Backend
Flink 提供了多种 state backends,它用于指定状态的存储方式和位置。
状态可以位于 Java 的堆或堆外内存。取决于你的 state backend,Flink 也可以自己管理应用程序的状态。 为了让应用程序可以维护非常大的状态,Flink 可以自己管理内存(如果有必要可以溢写到磁盘)。 默认情况下,所有 Flink Job 会使用配置文件 flink-conf.yaml 中指定的 state backend。
可用的 State Backends
1.13之前可用的 State Backends
但是,配置文件中指定的默认 state backend 会被 Job 中指定的 state backend 覆盖,如下所示。(1.13已过时)
Flink 内置了以下这些开箱即用的 state backends :
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
如果不设置,默认使用 MemoryStateBackend。
名称 | Working State | 状态备份 | 快照 |
---|---|---|---|
RocksDBStateBackend | 本地磁盘(tmp dir) | 分布式文件系统 | 全量 / 增量 |
支持大于内存大小的状态经验法则:比基于堆的后端慢10倍 | |||
FsStateBackend | JVM Heap | 分布式文件系统 | 全量 |
快速,需要大的堆内存受限制于 GC | |||
MemoryStateBackend | JVM Heap | JobManager JVM Heap | 全量 |
适用于小状态(本地)的测试和实验 |
1.13之后可用的 State Backends
Flink 内置了以下这些开箱即用的 state backends :
- HashMapStateBackend
- EmbeddedRocksDBStateBackend
如果不设置,默认使用 HashMapStateBackend。
如果没有明确指定,将使用 jobmanager 做为默认的 state backend。你能在 flink-conf.yaml 中为所有 Job 设置其他默认的 State Backend。 每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置,如下所示:
设置每个 Job 的 State Backend
1.13之前配置 State Backends
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend(""));
env.setStateBackend(new RocksDBStateBackend(""));
MemoryStateBackend 构造器参数解析
名称 | 解释 | 默认值 |
---|---|---|
checkpointPath | The path to write checkpoint metadata to. If null, the value from the runtime configuration will be used | null |
savepointPath | The path to write savepoints to. If null, the value from the runtime configuration will be used | null |
maxStateSize | The maximal size of the serialized state | 5MB,但是都不能大于 akka frame 大小(10MB) |
asynchronousSnapshots | This parameter is only there for API compatibility. Checkpoints are always asynchronous now | UNDEFINED(表示未配置,使用配置文件中的默认值),此参数可以传入true or false,表示是否开启异步快照 |
FsStateBackend 构造器参数解析
名称 | 解释 | 默认值 |
---|---|---|
checkpointDirectory | The path to write checkpoint metadata to | 不能为null,自己输入,没有默认值 |
defaultSavepointDirectory | The path to write savepoints to. If null, the value from the runtime configuration will be used, or savepoint target locations need to be passed when triggering a savepoint | null |
fileStateSizeThreshold | State below this size will be stored as part of the metadata, rather than in files. If -1, the value configured in the runtime configuration will be used, or the default value (1KB) if nothing is configured | 1KB |
writeBufferSize | Write buffer size used to serialize state. If -1, the value configured in the runtime configuration will be used, or the default value (4KB) if nothing is configured | 4KB |
asynchronousSnapshots | This parameter is only there for API compatibility. Checkpoints are always asynchronous now | UNDEFINED(表示未配置,使用配置文件中的默认值),此参数可以传入true or false,表示是否开启异步快照 |
RocksDBStateBackend 构造器参数解析
名称 | 解释 | 默认值 |
---|---|---|
checkpointDataUri | The URI describing the filesystem and path to the checkpoint data directory | 不能为null,无默认值 |
enableIncrementalCheckpointing | True if incremental checkpointing is enabled | UNDEFINED |
1.13之后配置 State Backends
StreamExecutionEnvironment
可以对每个 Job 的 State Backend 进行设置,如下所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
//或者
env.setStateBackend(new EmbeddedRocksDBStateBackend());
如果你想在 IDE 中使用 EmbeddedRocksDBStateBackend
,或者需要在作业中通过编程方式动态配置它,必须添加以下依赖到 Flink 项目中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
自旧版本迁移
从 Flink 1.13 版本开始,社区改进了 state backend 的公开类,进而帮助用户更好理解本地状态存储和 checkpoint 存储的区分。 这个变化并不会影响 state backend 和 checkpointing 过程的运行时实现和机制,仅仅是为了更好地传达设计意图。 用户可以将现有作业迁移到新的 API,同时不会损失原有 state。
MemoryStateBackend
旧版本的 MemoryStateBackend
等价于使用 HashMapStateBackend 和 JobManagerCheckpointStorage。
flink-conf.yaml
配置
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
FsStateBackend
旧版本的 FsStateBackend
等价于使用 HashMapStateBackend 和 FileSystemCheckpointStorage。
flink-conf.yaml
配置
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
RocksDBStateBackend
旧版本的 RocksDBStateBackend
等价于使用 EmbeddedRocksDBStateBackendb和 FileSystemCheckpointStorage.
flink-conf.yaml
配置
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
State
Keyed State
ValueState<T>
: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过update(T)
进行更新,通过T value()
进行检索。ListState<T>
: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)
或者addAll(List<T>)
进行添加元素,通过Iterable<T> get()
获得整个列表。还可以通过update(List<T>)
覆盖当前的列表。ReducingState<T>
: 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState
类似,但使用add(T)
增加元素,会使用提供的ReduceFunction
进行聚合。AggregatingState<IN, OUT>
: 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState
相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与ListState
类似,但使用add(IN)
添加的元素会用指定的AggregateFunction
进行聚合。MapState<UK, UV>
: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)
或者putAll(Map<UK,UV>)
添加映射。 使用get(UK)
检索特定 key。 使用entries()
,keys()
和values()
分别检索映射、键和值的可迭代视图。你还可以通过isEmpty()
来判断是否包含任何键值对。
状态通过 RuntimeContext
进行访问,因此只能在 rich functions 中使用。RichFunction中
RuntimeContext 提供如下方法:
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
Keyed State 代码中使用
// 实现自定义的FlatMapFunction,用于Keyed State测试
public static class MyFlatMap extends RichFlatMapFunction<Event, String>{
// 定义状态
ValueState<Event> myValueState;
ListState<Event> myListState;
MapState<String, Long> myMapState;
ReducingState<Event> myReducingState;
AggregatingState<Event, String> myAggregatingState;
// 增加一个本地变量进行对比
Long count = 0L;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Event> valueStateDescriptor = new ValueStateDescriptor<>("my-state", Event.class);
myValueState = getRuntimeContext().getState(valueStateDescriptor);
myListState = getRuntimeContext().getListState(new ListStateDescriptor<Event>("my-list", Event.class));
myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("my-map", String.class, Long.class));
myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Event>("my-reduce",
new ReduceFunction<Event>() {
@Override
public Event reduce(Event value1, Event value2) throws Exception {
return new Event(value1.user, value1.url, value2.timestamp);
}
}
, Event.class));
myAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Long, String>("my-agg",
new AggregateFunction<Event, Long, String>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accmulator) {
return accmulator + 1;
}
@Override
public String getResult(Long accumulator) {
return "count: " + accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
, Long.class));
// 配置状态的TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
valueStateDescriptor.enableTimeToLive(ttlConfig);
}
@Override
public void flatMap(Event value, Collector<String> out) throws Exception {
// 访问和更新状态
System.out.println(myValueState.value());
myValueState.update(value);
System.out.println( "my value: " + myValueState.value() );
myListState.add(value);
myMapState.put(value.user, myMapState.get(value.user) == null? 1: myMapState.get(value.user) + 1);
System.out.println( "my map value: " + myMapState.get(value.user) );
myReducingState.add(value);
System.out.println( "my reducing value: " + myReducingState.get() );
myAggregatingState.add(value);
System.out.println( "my agg value: " + myAggregatingState.get() );
count ++;
System.out.println("count: " + count);
}
}
Operator State
operator state 大体分为 ListState 、UnionListState。
用户可以通过实现 CheckpointedFunction
接口来使用 operator state。
CheckpointedFunction
CheckpointedFunction
接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
进行 checkpoint 时会调用 snapshotState()
。 用户自定义函数初始化时会调用 initializeState()
,初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此 initializeState()
不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。
当前 operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List
,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
- Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发读为 1,包含两个元素
element1
和element2
,当并发读增加为 2 时,element1
会被分到并发 0 上,element2
则会被分到并发 1 上。 - Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.
下面的例子中的 SinkFunction
在 CheckpointedFunction
中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() >= threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
initializeState
方法接收一个 FunctionInitializationContext
参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个 ListState
用于在 checkpoint 时保存 non-keyed state 对象。
注意这些状态是如何初始化的,和 keyed state 类似,StateDescriptor
会包括状态名字、以及状态类型相关信息。
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如
getUnionListState(descriptor)
会使用 union redistribution 算法, 而getListState(descriptor)
则简单的使用 even-split redistribution 算法。
当初始化好状态对象后,我们通过 isRestored()
方法判断是否从之前的故障中恢复回来,如果该方法返回 true
则表示从故障中进行恢复,会执行接下来的恢复逻辑。
正如代码所示,BufferingSink
中初始化时,恢复回来的 ListState
的所有元素会添加到一个局部变量中,供下次 snapshotState()
时使用。 然后清空 ListState
,再把当前局部变量中的所有元素写入到 checkpoint 中。
另外,我们同样可以在 initializeState()
方法中使用 FunctionInitializationContext
初始化 keyed state。
带状态的 Source Function
带状态的数据源比其他的算子需要注意更多东西。为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。
Java
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements CheckpointedFunction {
/** current offset for exactly once semantics */
private Long offset = 0L;
/** flag for job cancellation */
private volatile boolean isRunning = true;
/** 存储 state 的变量. */
private ListState<Long> state;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
"state",
LongSerializer.INSTANCE));
// 从我们已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法
for (Long l : state.get()) {
offset = l;
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.add(offset);
}
}
希望订阅 checkpoint 成功消息的算子,可以参考 org.apache.flink.api.common.state.CheckpointListener
接口。
Broadcast State
广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流,这个例子自然而然地运用了广播状态。 考虑到上述这类使用情形,广播状态和其他算子状态的不同之处在于:
- 它具有 map 格式,
- 它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流,
- 这类算子可以拥有不同命名的多个广播状态 。
注意: Python DataStream API 仍无法支持广播状态。
广播流代码的声明
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// 广播流,广播规则并且创建 broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
广播流的使用
为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream
),我们可以调用非广播流的方法 connect()
,并将 BroadcastStream
当做参数传入。 这个方法的返回参数是 BroadcastConnectedStream
,具有类型方法 process()
,传入一个特殊的 CoProcessFunction
来书写我们的模式识别逻辑。 具体传入 process()
的是哪个类型取决于非广播流的类型:
- 如果流是一个 keyed 流,那就是
KeyedBroadcastProcessFunction
类型; - 如果流是一个 non-keyed 流,那就是
BroadcastProcessFunction
类型。
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
在传入的 BroadcastProcessFunction
或 KeyedBroadcastProcessFunction
中,我们需要实现两个方法。processBroadcastElement()
方法负责处理广播流中的元素,processElement()
负责处理非广播流中的元素。 两个子类型定义如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
需要注意的是 processBroadcastElement()
负责处理广播流的元素,而 processElement()
负责处理另一个流的元素。两个方法的第二个参数(Context)不同,均有以下方法:
- 得到广播流的存储状态:
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- 查询元素的时间戳:
ctx.timestamp()
- 查询目前的Watermark:
ctx.currentWatermark()
- 目前的处理时间(processing time):
ctx.currentProcessingTime()
- 产生旁路输出:
ctx.output(OutputTag<X> outputTag, X value)
在 getBroadcastState()
方法中传入的 stateDescriptor
应该与调用 .broadcast(ruleStateDescriptor)
的参数相同。
这两个方法的区别在于对 broadcast state 的访问权限不同。在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的。 这样做的原因是,Flink 中是不存在跨 task 通讯的。所以为了保证 broadcast state 在所有的并发实例中是一致的,我们在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的, 那么最终所有 task 得到的 broadcast state 是一致的。
processBroadcastElement()
的实现必须在所有的并发实例中具有确定性的结果。
同时,KeyedBroadcastProcessFunction
在 Keyed Stream 上工作,所以它提供了一些 BroadcastProcessFunction
没有的功能:
processElement()
的参数ReadOnlyContext
提供了方法能够访问 Flink 的定时器服务,可以注册事件定时器(event-time timer)或者处理时间的定时器(processing-time timer)。当定时器触发时,会调用onTimer()
方法, 提供了OnTimerContext
,它具有ReadOnlyContext
的全部功能,并且提供:
- 查询当前触发的是一个事件还是处理时间的定时器
- 查询定时器关联的key
processBroadcastElement()
方法中的参数Context
会提供方法applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)
。 这个方法使用一个KeyedStateFunction
能够对stateDescriptor
对应的 state 中所有 key 的存储状态进行某些操作。
注册一个定时器只能在
KeyedBroadcastProcessFunction
的processElement()
方法中进行。 在processBroadcastElement()
方法中不能注册定时器,因为广播的元素中并没有关联的 key。
重要注意事项
这里有一些 broadcast state 的重要注意事项,在使用它时需要时刻清楚:
- 没有跨 task 通讯:如上所述,这就是为什么只有在
(Keyed)-BroadcastProcessFunction
中处理广播流元素的方法里可以更改 broadcast state 的内容。 同时,用户需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。 - **broadcast state 在不同的 task 的事件顺序可能是不同的:**虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同。 所以 broadcast state 的更新不能依赖于流中元素到达的顺序。
- 所有的 task 均会对 broadcast state 进行 checkpoint:*虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态/改变并发的时候数据*没有重复且没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(
p_new
-p_old
)会使用轮询调度算法读取之前 task 的 state。 - 不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State
状态有效期 (TTL)
任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,这会在后面详述。
所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。
在使用状态 TTL 前,需要先构建一个StateTtlConfig
配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下几个选项: newBuilder
的第一个参数表示数据的有效期,是必选项。
TTL 的更新策略(默认是 OnCreateAndWrite
):
-
StateTtlConfig.UpdateType.OnCreateAndWrite
- 仅在创建和写入时更新 -
StateTtlConfig.UpdateType.OnReadAndWrite
- 读取时也更新(注意: 如果你同时将状态的可见性配置为
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
, 那么在PyFlink作业中,状态的读缓存将会失效,这将导致一部分的性能损失)
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired
):
-
StateTtlConfig.StateVisibility.NeverReturnExpired
- 不返回过期数据(注意: 在PyFlink作业中,状态的读写缓存都将失效,这将导致一部分的性能损失)
-
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
- 会返回过期但未清理的数据
NeverReturnExpired
情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp
在数据被物理删除前都会返回。
注意:
- 状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。 Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。
- 暂时只支持基于 processing time 的 TTL。
- 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
- TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
- 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null, 可以用
NullableSerializer
包装一层。 - 启用 TTL 配置后,
StateDescriptor
中的defaultValue
(已被标记deprecated
)将会失效。这个设计的目的是为了确保语义更加清晰,在此基础上,用户需要手动管理那些实际值为 null 或已过期的状态默认值。
Time
WaterMark
1. 关于 Flink 的 Watermark 1.12版本及之后默认时间语义为Event time
2. 时间戳和水印都被指定为毫秒,自 Java 时代 1970-01-01T00:00:00Z 以来。
1.10及之前WaterMark定义
使用事件语义之前,先定义整个flink使用的是事件语义而不是处理时间语义
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
定义在源函数中的时间戳和水印
数据源可以直接为其生成的元素分配时间戳,还可以发出水印。完成此操作后,不需要时间戳分配器。请注意,如果使用时间戳分配器,则源提供的任何时间戳和水印都将被覆盖。
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
方法 | 解释 | 参数 |
---|---|---|
collectWithTimestamp(T element, long timestamp) | 从源发出一个元素,并附加给定的时间戳。 | @param element The element to emit @param timestamp The timestamp in milliseconds since the Epoch |
emitWatermark(Watermark mark) | 发出给定的水印。值为t的Watermark声明时间戳为t' <= t的元素将不再出现。如果要释放更多这样的元素,那么这些元素会被认为是晚的。 | @param mark – The Watermark to emit |
定义在数据流中的时间戳和水印
时间戳分配者获取流并生成带有时间戳元素和水印的新流。如果原始流已经具有时间戳和/或水印,则时间戳分配者将覆盖它们。
时间戳分配者通常在数据源之后立即指定,但并非严格要求这样做。例如,一种常见的模式是在时间戳分配器之前解析(MapFunction)和过滤器(FilterFunction)。在任何情况下,都需要在对事件时间执行第一个操作(如第一个窗口操作)之前指定时间戳分配器。作为一个特殊情况,当使用Kafka作为流作业的源时,Flink允许在源(或消费者)本身内部指定时间戳分配器/水印发射器。
周期式的水印
AssignerWithPeriodicWatermarks
分配时间戳并定期生成水印(可能取决于流元素,或纯粹基于处理时间)。
生成水印的间隔(每 n 毫秒,默认200ms)通过 定义(env.getConfig().setAutoWatermarkInterval(...))。每次都会调用分配者的方法(getCurrentWatermark()),如果返回的水印不为空且大于以前的水印,则会发出新的水印。
1. AscendingTimestampExtractor (无乱序数据时使用)
source.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<T>() {
@Override
public long extractAscendingTimestamp(T element) {
return 0;
}
});
2. BoundedOutOfOrdernessTimestampExtractor (有界乱序数据时使用)
Time.seconds(3) : 延迟时间
泛型T :数据流中每条数据的类型
source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<T>(Time.seconds(3)) {
@Override
public long extractTimestamp(T element) {
return 0;
}
});
3. 自定义周期生成WaterMark
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<T>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}
@Override
public long extractTimestamp(T element, long previousElementTimestamp) {
return 0;
}
});
getCurrentWatermark() : 这个方法默认每200ms会自动调用一次。
extractTimestamp() :这个方法每条数据都调用一次。
断点式的水印
自定义断点式生成WaterMark
source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Event>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Event lastElement, long extractedTimestamp) {
return null;
}
@Override
public long extractTimestamp(Event element, long previousElementTimestamp) {
return 0;
}
});
checkAndGetNextWatermark : 这个方法每条数据都调用一次。
extractTimestamp : 这个方法每条数据都调用一次。
1.10之后WaterMark定义
为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner
API 从元素中的某个字段去访问/提取时间戳。
时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator
来配置 watermark 的生成方式。
使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy 。WatermarkStrategy
工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。
周期式的水印
1. forBoundedOutOfOrderness (有界乱序时使用)
泛型T :每条数据的数据类型。
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为5s
WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<T>() {
// 抽取时间戳的逻辑
@Override
public long extractTimestamp(T element, long recordTimestamp) {
return element.timestamp;
}
})
)
2. forBoundedOutOfOrderness (无乱序数据时使用)
.assignTimestampsAndWatermarks(WatermarkStrategy.<T>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<T>() {
@Override
public long extractTimestamp(T element, long recordTimestamp) {
return element.timestamp;
}
})
)
**3. 自定义周期性 Watermark 生成器 **
周期性生成器会观察流事件数据并定期生成 watermark(其生成可能取决于流数据,或者完全基于处理时间)。
生成 watermark 的时间间隔(每 n 毫秒)可以通过 ExecutionConfig.setAutoWatermarkInterval(...)
指定。每次都会调用生成器的 onPeriodicEmit()
方法,如果返回的 watermark 非空且值大于前一个 watermark,则将发出新的 watermark。
如下是两个使用周期性 watermark 生成器的简单示例。注意:Flink 已经附带了 BoundedOutOfOrdernessWatermarks
,它实现了 WatermarkGenerator
,其工作原理与下面的 BoundedOutOfOrdernessGenerator
相似。
/**
* 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
* 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 秒
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发出的 watermark = 当前最大时间戳 - 最大乱序时间
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
/**
* 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 秒
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// 处理时间场景下不需要实现
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
断点式的水印
自定义标记 Watermark 生成器
标记 watermark 生成器观察流事件数据并在获取到带有 watermark 信息的特殊事件元素时发出 watermark。
如下是实现标记生成器的方法,当事件带有某个指定标记时,该生成器就会发出 watermark:
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// onEvent 中已经实现
}
}
注意: 可以针对每个事件去生成 watermark。但是由于每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程序性能
Window
概述
窗口
窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。
下面展示了 Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构。 我们可以看到,这两者唯一的区别仅在于:keyed streams 要调用 keyBy(...)
后再调用 window(...)
, 而 non-keyed streams 只用直接调用 windowAll(...)
。留意这个区别,它能帮我们更好地理解后面的内容。
Keyed Windows
stream
.keyBy(...) <- 仅 keyed 窗口需要
.window(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger)
[.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0)
[.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"
Non-Keyed Windows
stream
.windowAll(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (else default trigger)
[.evictor(...)] <- 可选项:"evictor" (else no evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (else zero)
[.sideOutputLateData(...)] <- 可选项:"output tag" (else no side output for late data)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"
上面方括号([…])中的命令是可选的。也就是说,Flink 允许你自定义多样化的窗口操作来满足你的需求。
窗口的生命周期
简单来说,一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness
时被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口。 例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00
至 12:05
这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06
时,这个窗口将被摧毁。
另外,每个窗口会设置自己的 Trigger
和 function (ProcessWindowFunction
、ReduceFunction
、或 AggregateFunction
)。该 function 决定如何计算窗口中的内容, 而 Trigger
决定何时窗口中的数据可以被 function 计算。 Trigger 的触发(fire)条件可能是“当窗口中有多于 4 条数据”或“当 watermark 越过窗口的结束时间”等。 Trigger 还可以在 window 被创建后、删除前的这段时间内定义何时清理(purge)窗口中的数据。 这里的数据仅指窗口内的元素,不包括窗口的 meta data。也就是说,窗口在 purge 后仍然可以加入新的数据。
除此之外,你也可以指定一个 Evictor
,在 trigger 触发之后,Evictor 可以在窗口函数的前后删除数据。
Keyed 和 Non-Keyed Windows
首先必须要在定义窗口前确定的是你的 stream 是 keyed 还是 non-keyed。 keyBy(...)
会将你的无界 stream 分割为逻辑上的 keyed stream。 如果 keyBy(...)
没有被调用,你的 stream 就不是 keyed。
对于 keyed stream,其中数据的任何属性都可以作为 key 。 使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。
对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream, 所以所有的窗口计算会被同一个 task 完成,也就是 parallelism 为 1。
Window Assigners
指定了你的 stream 是否为 keyed 之后,下一步就是定义 window assigner。
Window assigner 定义了 stream 中的元素如何被分发到各个窗口。 你可以在 window(...)
(用于 keyed streams)或 windowAll(...)
(用于 non-keyed streams)中指定一个 WindowAssigner
。 WindowAssigner
负责将 stream 中的每个数据分发到一个或多个窗口中。 Flink 为最常用的情况提供了一些定义好的 window assigner,也就是 tumbling windows、 sliding windows、 session windows 和 global windows。 你也可以继承 WindowAssigner
类来实现自定义的 window assigner。 所有内置的 window assigner(除了 global window)都是基于时间分发数据的,processing time 或 event time 均可。
基于时间的窗口用 start timestamp(包含)和 end timestamp(不包含)描述窗口的大小。 在代码中,Flink 处理基于时间的窗口使用的是 TimeWindow
, 它有查询开始和结束 timestamp 以及返回窗口所能储存的最大 timestamp 的方法 maxTimestamp()
。
// 窗口开始时间计算方式
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
接下来我们会说明 Flink 内置的 window assigner 如何工作,以及他们如何用在 DataStream 程序中。 下面的图片展示了每种 assigner 如何工作。 紫色的圆圈代表 stream 中按 key 划分的元素(本例中是按 user 1、user 2 和 user 3 划分)。 x 轴表示时间的进展。
滚动窗口(Tumbling Windows)
滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
下面的代码展示了如何使用滚动窗口。
DataStream<T> input = ...;
// 滚动 event-time 窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滚动 processing-time 窗口
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
时间间隔可以用 Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
等来指定。
如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset
参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999
、2:00:00.000 - 2:59:59.999
等。 如果你想改变对齐方式,你可以设置一个 offset。如果设置了 15 分钟的 offset, 你会得到 1:15:00.000 - 2:14:59.999
、2:15:00.000 - 3:14:59.999
等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)
。
滑动窗口(Sliding Windows)
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。
下面的代码展示了如何使用滑动窗口。
DataStream<T> input = ...;
// 滑动 event-time 窗口
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滑动 processing-time 窗口
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滑动 processing-time 窗口,偏移量为 -8 小时
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
时间间隔可以使用 Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
等来指定。
如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset
参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时、滑动距离为 30 分钟的滑动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999
, 1:30:00.000 - 2:29:59.999
等。 如果你想改变对齐方式,你可以设置一个 offset。 如果设置了 15 分钟的 offset,你会得到 1:15:00.000 - 2:14:59.999
、1:45:00.000 - 2:44:59.999
等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)
。
会话窗口(Session Windows)
会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
下面的代码展示了如何使用会话窗口。
DataStream<T> input = ...;
// 设置了固定间隔的 event-time 会话窗口
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// 设置了动态间隔的 event-time 会话窗口
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// 决定并返回会话间隔
}))
.<windowed transformation>(<window function>);
// 设置了固定间隔的 processing-time session 窗口
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// 设置了动态间隔的 processing-time 会话窗口
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// 决定并返回会话间隔
}))
.<windowed transformation>(<window function>);
固定间隔可以使用 Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
等来设置。
动态间隔可以通过实现 SessionWindowTimeGapExtractor
接口来指定。
会话窗口并没有固定的开始或结束时间,所以它的计算方法与滑动窗口和滚动窗口不同。在 Flink 内部,会话窗口的算子会为每一条数据创建一个窗口, 然后将距离不超过预设间隔的窗口合并。 想要让窗口可以被合并,会话窗口需要拥有支持合并的 Trigger 和 Window Function, 比如说
ReduceFunction
、AggregateFunction
或ProcessWindowFunction
。
全局窗口(Global Windows)
全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger
时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。
下面的代码展示了如何使用全局窗口。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
窗口函数(Window Functions)
定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了。关于窗口如何触发,详见 triggers。
窗口函数有三种:ReduceFunction
、AggregateFunction
或 ProcessWindowFunction
。 前两者执行起来更高效,因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。 而 ProcessWindowFunction
会得到能够遍历当前窗口内所有数据的 Iterable
,以及关于这个窗口的 meta-information。
使用 ProcessWindowFunction
的窗口转换操作没有其他两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据。 ProcessWindowFunction
可以与 ReduceFunction
或 AggregateFunction
合并来提高效率。 这样做既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction
接收窗口的 metadata。 我们接下来看看每种函数的例子。
ReduceFunction
ReduceFunction
指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction
对窗口中的数据进行增量聚合。
ReduceFunction
可以像下面这样定义:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
上面的例子是对窗口内元组的第二个属性求和。
AggregateFunction
ReduceFunction
是 AggregateFunction
的特殊情况。 AggregateFunction
接收三个类型:输入数据的类型(IN
)、累加器的类型(ACC
)和输出数据的类型(OUT
)。 输入数据的类型是输入流的元素类型,AggregateFunction
接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT
类型)。我们通过下例说明。
与 ReduceFunction
相同,Flink 会在输入数据到达窗口时直接进行增量聚合。
AggregateFunction
可以像下面这样定义:
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
上例计算了窗口内所有元素第二个属性的平均值。
ProcessWindowFunction
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。
ProcessWindowFunction
的签名如下:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
key
参数由 keyBy()
中指定的 KeySelector
选出。 如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple
, 并且你需要手动将它转换为正确大小的 tuple 才能提取 key。
ProcessWindowFunction
可以像下面这样定义:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
上例使用 ProcessWindowFunction
对窗口中的元素计数,并且将窗口本身的信息一同输出。
注意,使用
ProcessWindowFunction
完成简单的聚合任务是非常低效的。 下一章会说明如何将ReduceFunction
或AggregateFunction
与ProcessWindowFunction
组合成既能 增量聚合又能获得窗口额外信息的窗口函数。
增量聚合的 ProcessWindowFunction
ProcessWindowFunction
可以与 ReduceFunction
或 AggregateFunction
搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction
将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction
中获得窗口的元数据。
你也可以对过时的 WindowFunction
使用增量聚合。
使用 ReduceFunction 增量聚合
下例展示了如何将 ReduceFunction
与 ProcessWindowFunction
组合,返回窗口中的最小元素和窗口的开始时间。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
使用 AggregateFunction 增量聚合
下例展示了如何将 AggregateFunction
与 ProcessWindowFunction
组合,计算平均值并与窗口对应的 key 一同输出。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
在 ProcessWindowFunction 中使用 per-window state
除了访问 keyed state (任何富函数都可以),ProcessWindowFunction
还可以使用作用域仅为 “当前正在处理的窗口”的 keyed state。在这种情况下,理解 per-window 中的 window 指的是什么非常重要。 总共有以下几种窗口的理解:
- 在窗口操作中定义的窗口:比如定义了长一小时的滚动窗口或长两小时、滑动一小时的滑动窗口。
- 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口。 具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。
Per-window state 作用于后者。也就是说,如果我们处理有 1000 种不同 key 的事件, 并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么我们将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。
process()
接收到的 Context
对象中有两个方法允许我们访问以下两种 state:
-
globalState()
,访问全局的 keyed state -
windowState()
, 访问作用域仅限于当前窗口的 keyed state
KeyedStateStore keyedStateStore = context.windowState();
ValueState<String> valueState = keyedStateStore.getState(new ValueStateDescriptor<String>("test", TypeInformation.of(new TypeHint<String>() {})));
KeyedStateStore keyedStateStore1 = context.globalState();
如果你可能将一个 window 触发多次(比如当你的迟到数据会再次触发窗口计算, 或你自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用。 这时你可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。
当使用窗口状态时,一定记得在删除窗口时清除这些状态。他们应该定义在 clear()
方法中。
WindowFunction(已过时)
在某些可以使用 ProcessWindowFunction
的地方,你也可以使用 WindowFunction
。 它是旧版的 ProcessWindowFunction
,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。 这个接口会在未来被弃用。
WindowFunction
的签名如下:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
它可以像下例这样使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
Triggers
Trigger
决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner
都有一个默认的 Trigger
。 如果默认 trigger 无法满足你的需要,你可以在 trigger(...)
调用中指定自定义的 trigger。
Trigger 接口提供了五个方法来响应不同的事件:
onElement()
方法在每个元素被加入窗口时调用。onEventTime()
方法在注册的 event-time timer 触发时调用。onProcessingTime()
方法在注册的 processing-time timer 触发时调用。onMerge()
方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。- 最后,
clear()
方法处理在对应窗口被移除时所需的逻辑。
有两点需要注意:
- 前三个方法通过返回
TriggerResult
来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:
CONTINUE
: 什么也不做FIRE
: 触发计算PURGE
: 清空窗口内的元素FIRE_AND_PURGE
: 触发计算,计算结束后清空窗口内的元素
- 上面的任意方法都可以用来注册 processing-time 或 event-time timer。
触发(Fire)与清除(Purge)
当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE
或 FIRE_AND_PURGE
。 这是让窗口算子发送当前窗口计算结果的信号。 如果一个窗口指定了 ProcessWindowFunction
,所有的元素都会传给 ProcessWindowFunction
。 如果是 ReduceFunction
或 AggregateFunction
,则直接发送聚合的结果。
当 trigger 触发时,它可以返回 FIRE
或 FIRE_AND_PURGE
。 FIRE
会保留被触发的窗口中的内容,而 FIRE_AND_PURGE
会删除这些内容。 Flink 内置的 trigger 默认使用 FIRE
,不会清除窗口的状态。
Purge 只会移除窗口的内容, 不会移除关于窗口的 meta-information 和 trigger 的状态。
WindowAssigner 默认的 Triggers
WindowAssigner
默认的 Trigger
足以应付诸多情况。 比如说,所有的 event-time window assigner 都默认使用 EventTimeTrigger
。 这个 trigger 会在 watermark 越过窗口结束时间后直接触发。
GlobalWindow
的默认 trigger 是永远不会触发的 NeverTrigger
。因此,使用 GlobalWindow
时,你必须自己定义一个 trigger。
当你在
trigger()
中指定了一个 trigger 时, 你实际上覆盖了当前WindowAssigner
默认的 trigger。 比如说,如果你指定了一个CountTrigger
给TumblingEventTimeWindows
,你的窗口将不再根据时间触发, 而是根据元素数量触发。如果你希望即响应时间,又响应数量,就需要自定义 trigger 了。
内置 Triggers 和自定义 Triggers
Flink 包含一些内置 trigger。
- 之前提到过的
EventTimeTrigger
根据 watermark 测量的 event time 触发。 ProcessingTimeTrigger
根据 processing time 触发。CountTrigger
在窗口中的元素超过预设的限制时触发。PurgingTrigger
接收另一个 trigger 并将它转换成一个会清理数据的 trigger。
如果你需要实现自定义的 trigger,你应该看看这个抽象类 Trigger 。 请注意,这个 API 仍在发展,所以在之后的 Flink 版本中可能会发生变化。
Evictors
Flink 的窗口模型允许在 WindowAssigner
和 Trigger
之外指定可选的 Evictor
。 如本文开篇的代码中所示,通过 evictor(...)
方法传入 Evictor
。 Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。 Evictor
接口提供了两个方法实现此功能:
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evictBefore()
包含在调用窗口函数前的逻辑,而 evictAfter()
包含在窗口函数调用之后的逻辑。 在调用窗口函数之前被移除的元素不会被窗口函数计算。
Flink 内置有三个 evictor:
CountEvictor
: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除DeltaEvictor
: 接收DeltaFunction
和threshold
参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于threshold
的元素。TimeEvictor
: 接收interval
参数,以毫秒表示。 它会找到窗口中元素的最大 timestampmax_ts
并移除比max_ts - interval
小的所有元素。
默认情况下,所有内置的 evictor 逻辑都在调用窗口函数前执行。
指定一个 evictor 可以避免预聚合,因为窗口中的所有元素在计算前都必须经过 evictor。
Flink 不对窗口中元素的顺序做任何保证。也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先或者最后到达窗口的。
Allowed Lateness
在使用 event-time 窗口时,数据可能会迟到,即 Flink 用来追踪 event-time 进展的 watermark 已经 越过了窗口结束的 timestamp 后,数据才到达。
默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃。 但是 Flink 允许指定窗口算子最大的 allowed lateness。 Allowed lateness 定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。 在 watermark 超过窗口末端、到达窗口末端加上 allowed lateness 之前的这段时间内到达的元素, 依旧会被加入窗口。取决于窗口的 trigger,一个迟到但没有被丢弃的元素可能会再次触发窗口,比如 EventTimeTrigger
。
为了实现这个功能,Flink 会将窗口状态保存到 allowed lateness 超时才会将窗口及其状态删除。
默认情况下,allowed lateness 被设为 0
。即 watermark 之后到达的元素会被丢弃。
你可以像下面这样指定 allowed lateness:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
使用
GlobalWindows
时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是Long.MAX_VALUE
。
从旁路输出(side output)获取迟到数据
通过 Flink 的 旁路输出 功能,你可以获得迟到数据的数据流。
可以通过以下方法将数据发送到旁路输出:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// 发送数据到主要的输出
out.collect(value);
// 发送数据到旁路输出
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
首先,你需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag)
表明你需要获取迟到数据。 然后,你就可以从窗口操作的结果中获取旁路输出流了。
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
迟到数据的一些考虑
当指定了大于 0 的 allowed lateness 时,窗口本身以及其中的内容仍会在 watermark 越过窗口末端后保留。 这时,如果一个迟到但未被丢弃的数据到达,它可能会再次触发这个窗口。 这种触发被称作 late firing
,与表示第一次触发窗口的 main firing
相区别。 如果是使用会话窗口的情况,late firing 可能会进一步合并已有的窗口,因为他们可能会连接现有的、未被合并的窗口。
你应该注意:late firing 发出的元素应该被视作对之前计算结果的更新,即你的数据流中会包含一个相同计算任务的多个结果。你的应用需要考虑到这些重复的结果,或去除重复的部分。
Working with window results
窗口操作的结果会变回 DataStream
,并且窗口操作的信息不会保存在输出的元素中。 所以如果你想要保留窗口的 meta-information,你需要在 ProcessWindowFunction
里手动将他们放入输出的元素中。 输出元素中保留的唯一相关的信息是元素的 timestamp。 它被设置为窗口能允许的最大 timestamp,也就是 end timestamp - 1,因为窗口末端的 timestamp 是排他的。 这个情况同时适用于 event-time 窗口和 processing-time 窗口。 也就是说,在窗口操作之后,元素总是会携带一个 event-time 或 processing-time timestamp。 对 Processing-time 窗口来说,这并不意味着什么。 而对于 event-time 窗口来说,“输出携带 timestamp” 以及 “watermark 与窗口的相互作用” 这两者使建立窗口大小相同的连续窗口操作(consecutive windowed operations) 变为可能。我们先看看 watermark 与窗口的相互作用,然后再来讨论它。
Interaction of watermarks and windows
继续阅读这个章节之前,你可能想要先了解一下我们介绍 event time 和 watermarks 的内容。
当 watermark 到达窗口算子时,它触发了两件事:
- 这个 watermark 触发了所有最大 timestamp(即 end-timestamp - 1)小于它的窗口
- 这个 watermark 被原封不动地转发给下游的任务。
通俗来讲,watermark 将当前算子中那些“一旦这个 watermark 被下游任务接收就肯定会就超时”的窗口全部冲走。
Consecutive windowed operations
如之前提到的,窗口结果的 timestamp 如何计算以及 watermark 如何与窗口相互作用使串联多个窗口操作成为可能。 这提供了一种便利的方法,让你能够有两个连续的窗口,他们即能使用不同的 key, 又能让上游操作中某个窗口的数据出现在下游操作的相同窗口。参考下例:
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
这个例子中,第一个操作中时间窗口[0, 5)
的结果会出现在下一个窗口操作的 [0, 5)
窗口中。 这就可以让我们先在一个窗口内按 key 求和,再在下一个操作中找出这个窗口中 top-k 的元素。