Flink SQL之Over 聚合操作

应用场景:计算最近一段滑动窗口的聚合结果数据。
实际案例:查询每个产品最近一小时订单的金额总和:

SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders

Over 聚合的语法总结如下:

SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...

其中:

  • ORDER BY:必须是时间戳列(事件时间、处理时间)
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合​,第二种为按照时间区间聚合。
  • 如下案例所示:

    时间区间聚合

    按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

    CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.order_id.min' = '1',
    'fields.order_id.max' = '2',
    'fields.amount.min' = '1',
    'fields.amount.max' = '10',
    'fields.product.min' = '1',
    'fields.product.max' = '2'
    );
    CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_prod_amount_sum BIGINT
    ) WITH (
    'connector' = 'print'
    );
    INSERT INTO sink_table
    SELECT product, order_time, amount,
    SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 1 小时的数据
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    ) AS one_hour_prod_amount_sum
    FROM source_table

    结果如下:

    +I[2, 2021-12-24T22:08:26.583, 7, 73]
    +I[2, 2021-12-24T22:08:27.583, 7, 80]
    +I[2, 2021-12-24T22:08:28.583, 4, 84]
    +I[2, 2021-12-24T22:08:29.584, 7, 91]
    +I[2, 2021-12-24T22:08:30.583, 8, 99]
    +I[1, 2021-12-24T22:08:31.583, 9, 138]
    +I[2, 2021-12-24T22:08:32.584, 6, 105]
    +I[1, 2021-12-24T22:08:33.584, 7, 145]
    行数聚合

    按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。

    CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.order_id.min' = '1',
    'fields.order_id.max' = '2',
    'fields.amount.min' = '1',
    'fields.amount.max' = '2',
    'fields.product.min' = '1',
    'fields.product.max' = '2'
    );
    CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_prod_amount_sum BIGINT
    ) WITH (
    'connector' = 'print'
    );
    INSERT INTO sink_table
    SELECT product, order_time, amount,
    SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 5 行数据
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
    ) AS one_hour_prod_amount_sum
    FROM source_table

    预跑结果如下:

    +I[2, 2021-12-24T22:18:19.147, 1, 9]
    +I[1, 2021-12-24T22:18:20.147, 2, 11]
    +I[1, 2021-12-24T22:18:21.147, 2, 12]
    +I[1, 2021-12-24T22:18:22.147, 2, 12]
    +I[1, 2021-12-24T22:18:23.148, 2, 12]
    +I[1, 2021-12-24T22:18:24.147, 1, 11]
    +I[1, 2021-12-24T22:18:25.146, 1, 10]
    +I[1, 2021-12-24T22:18:26.147, 1, 9]
    +I[2, 2021-12-24T22:18:27.145, 2, 11]
    +I[2, 2021-12-24T22:18:28.148, 1, 10]
    +I[2, 2021-12-24T22:18:29.145, 2, 10]

    当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:

    SELECT order_id, order_time, amount,
    SUM(amount) OVER w AS sum_amount,
    AVG(amount) OVER w AS avg_amount
    FROM Orders
    -- 使用下面子句,定义 Over Window
    WINDOW w AS (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
    THE END
    分享
    二维码
    < <上一篇
    下一篇>>