CREATE TEMPORARY TABLE user_action_source (
`timestamp` BIGINT,
`user_id` BIGINT,
`item_id` BIGINT,
`price` DOUBLE,SQs
) WITH (
'connector' = 'kafka',
'topic' = '<your_topic>',
'properties.bootstrap.servers' = 'your_kafka_server:9092',
'properties.group.id' = '<your_consumer_group>'
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TEMPORARY TABLE item_detail_dim (
id STRING,
catagory STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<your_redis_host>',
'port' = '<your_redis_port>',
'password' = '<your_redis_password>',
'dbNum' = '<your_db_num>'
);
CREATE TEMPORARY TABLE gmv_output (
time_minute STRING,
catagory STRING,
gmv DOUBLE,
PRIMARY KEY (time_minute, catagory)
) WITH (
type='rds',
url='<your_jdbc_mysql_url_with_database>',
tableName='<your_table>',
userName='<your_mysql_database_username>',
password='<your_mysql_database_password>'
);
INSERT INTO gmv_output
SELECT
TUMBLE_START(s.timestamp, INTERVAL '1' MINUTES) as time_minute,
d.catagory,
SUM(d.price) as gmv
FROM
user_action_source s
JOIN item_detail_dim FOR SYSTEM_TIME AS OF PROCTIME() as d
ON s.item_id = d.id
GROUP BY TUMBLE(s.timestamp, INTERVAL '1' MINUTES), d.catagory;