正如在timestamp 和 watermark处理中所描述的一样,Flink 提供了抽象概念来允许程序员指定自己的timestamp和发射他们自己的watermark。更具体地说,用户可以根据自己的需要来实现AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks中的一个接口来指定自己的timestamp和发射自己的watermark。
为了更进一步的简化这些任务的编程工作,Flink还提供了一些预实现的timestamp分配器,这个章节提供了这些预实现timestamp分配器的列表。除了它们拆箱即用的功能外,它们的实现也可作为自定义实现的例子。
递增时间戳分配器
周期性水印生成的最简单的特殊例子是时间戳被给定的源任务按递增顺序产生,在这种情况下,当前的时间戳永远可以作为水印,因为没有更早的时间戳将到达。
注意:每个并行数据源任务中的timestamp是递增的,这是很必要的,例如:如果指定了一个Kafka分区被一个并行数据源实例读取,那么每个Kafka分区的timestamp是递增的,这是很有必要的。Flink的watermark合并机制将会在并行数据流shuffled、unioned、connected 或者 merged的时候产生正确的水印。
Java 代码:
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
Scala 代码:
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
允许固定数量延迟的分配器
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
Scala代码:
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))