flink窗口函数增量聚合与全量聚合

文章资讯 2020-08-07 21:30:27

flink窗口函数增量聚合与全量聚合

本次时间窗口聚合结果+ 历史聚合结果 聚合后生成新的历史聚合结果
public class InterctiveReduceFunction implements ReduceFunction> {
private static final Logger logger = Logger.getLogger(InterctiveReduceFunction.class);
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
value1.setFields(value1.f0, value1.f1 + value2.f1);
return value1;
}
}
public class InterctiveWindowFunction implements WindowFunction, Tuple3, String, TimeWindow> {
private static final Logger logger = Logger.getLogger(InterctiveWindowFunction.class);
public void apply(String key, TimeWindow timeWindow, Iterable> iterable, Collector> collector) throws Exception {
long count = 0;
for (Tuple2 value : iterable) {
count += value.f1;
}
collector.collect(Tuple3.of(DealMidInteractive.BhvType, key, count));
}
}
第一个函数是 增量聚合,第二个是全量聚合。