本文共 10296 字,大约阅读时间需要 34 分钟。
PostgreSQL , Greenplum , merge insert , insert on conflict , 合并插入 , 有则更新 , 无则插入
PostgreSQL insert on conflict语法非常强大,支持合并写入(当违反某唯一约束时,冲突则更新,不冲突则写入),同时支持流式计算。
流计算例子链接:
PostgreSQL insert on conflict语法如下:
Command: INSERT Description: create new rows in a table Syntax: [ WITH [ RECURSIVE ] with_query [, ...] ] INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ] [ OVERRIDING { SYSTEM | USER} VALUE ] { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query } [ ON CONFLICT [ conflict_target ] conflict_action ] [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ] where conflict_target can be one of: ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ] ON CONSTRAINT constraint_name and conflict_action is one of: DO NOTHING DO UPDATE SET { column_name = { expression | DEFAULT } | ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) | ( column_name [, ...] ) = ( sub-SELECT ) } [, ...] [ WHERE condition ]
Greenplum的版本较低,还不支持insert on conflict的语法。
如果需要在Greenplum中实现类似的功能该如何操作?
ID为PK,以它为合并列,举例。
1、目标表,也就是需要合并写入的目标:
create table t( id int primary key, c1 int , c2 int, c3 int, c4 int, c5 int, crt_time timestamp);
2、中间表,也就是用户只管插入的表:
create table t_tmp(like t);
写入一些中间记录。
insert into t_tmp values(1,1,2,3,null,null,now()); insert into t_tmp values(1,1,2,4,null,null,now()); insert into t_tmp values(1,1,2,3,null,7,now()); insert into t_tmp values(1,1,null,3,5,6,now());
postgres=# select * from t_tmp; id | c1 | c2 | c3 | c4 | c5 | crt_time ----+----+----+----+----+----+---------------------------- 1 | 1 | 2 | 3 | | | 2017-12-13 17:03:16.28482 1 | 1 | 2 | 4 | | | 2017-12-13 17:03:16.286302 1 | 1 | 2 | 3 | | 7 | 2017-12-13 17:03:16.635121 1 | 1 | | 3 | 5 | 6 | 2017-12-13 17:03:25.434191 (4 rows)
3、窗口合并,按唯一值约束,仅提取一条(可能存在窗口内合并的需求,例如按时间取最新,比如以最后一条为准,又或者以有值,且最新的为准)。
以有值切最新为准例子:
select distinct on (id) id, first_value(c1) over (partition by id order by (case when c1 is null then null else crt_time end) desc nulls last) as c1, first_value(c2) over (partition by id order by (case when c2 is null then null else crt_time end) desc nulls last) as c2, first_value(c3) over (partition by id order by (case when c3 is null then null else crt_time end) desc nulls last) as c3, first_value(c4) over (partition by id order by (case when c4 is null then null else crt_time end) desc nulls last) as c4, first_value(c5) over (partition by id order by (case when c5 is null then null else crt_time end) desc nulls last) as c5, first_value(crt_time) over (partition by id order by crt_time desc) as crt_time from t_tmp ; id | c1 | c2 | c3 | c4 | c5 | crt_time ----+----+----+----+----+----+---------------------------- 1 | 1 | 2 | 3 | 5 | 6 | 2017-12-13 17:03:25.434191 (1 row)
存储中间结果:
create table t_tmp1 (like t) ; insert into t_tmp1 select distinct on (id) id, first_value(c1) over (partition by id order by (case when c1 is null then null else crt_time end) desc nulls last) as c1, first_value(c2) over (partition by id order by (case when c2 is null then null else crt_time end) desc nulls last) as c2, first_value(c3) over (partition by id order by (case when c3 is null then null else crt_time end) desc nulls last) as c3, first_value(c4) over (partition by id order by (case when c4 is null then null else crt_time end) desc nulls last) as c4, first_value(c5) over (partition by id order by (case when c5 is null then null else crt_time end) desc nulls last) as c5, first_value(crt_time) over (partition by id order by crt_time desc) as crt_time from t_tmp ;
4、合并写入:
将窗口提取的结果,合并写入目标表。
4.1、INNER JOIN,覆盖旧记录,同时补齐旧的字段(以NULL为判断条件。如果新的记录没有值,则取旧记录的值。)提取。
create table t_tmp2 (like t); insert into t_tmp2 select t_tmp.id, coalesce(t_tmp.c1, t.c1), coalesce(t_tmp.c2, t.c2), coalesce(t_tmp.c3, t.c3), coalesce(t_tmp.c4, t.c4), coalesce(t_tmp.c5, t.c5), coalesce(t_tmp.crt_time, t.crt_time) from t_tmp1 as t_tmp inner join t using (id);
4.2、DELETE USING,删除全量表的符合条件的记录。
delete from t using t_tmp2 where t.id=t_tmp2.id;
4.3、INSERT
insert into t select t_tmp1.* from t_tmp1 left join t_tmp2 using (id) where t_tmp2.* is null union all select * from t_tmp2;
硬件:使用一台64线程机器,单机启动48个segment。
1、全量数据20亿。
create table t(id int, c1 int , c2 int, c3 int, c4 int, c5 int, crt_time timestamp) with (APPENDONLY=true, ORIENTATION=column); insert into t select id, null,null,null,null,10000, now() from generate_series(1,2000000000) t(id);
2、增量数据1000万条,涉及500万个ID。
create table t_tmp(like t); insert into t_tmp select random()*1000000, random()*100,null,null,null,null, clock_timestamp() from generate_series(1,2000000) t(id); insert into t_tmp select random()*2000000, null,random()*100,null,null,null, clock_timestamp() from generate_series(1,2000000) t(id); insert into t_tmp select random()*3000000, null,null,random()*100,null,null, clock_timestamp() from generate_series(1,2000000) t(id); insert into t_tmp select random()*4000000, null,null,null,random()*100,null, clock_timestamp() from generate_series(1,2000000) t(id); insert into t_tmp select random()*5000000, null,null,null,null,random()*100, clock_timestamp() from generate_series(1,2000000) t(id); 总耗时4.5秒。
3、合并。
增量数据,窗口合并去重。
create table t_tmp1 (like t) ; insert into t_tmp1 select distinct on (id) id, first_value(c1) over (partition by id order by ((case when c1 is null then null else crt_time end) is null), (case when c1 is null then null else crt_time end) desc) as c1, first_value(c2) over (partition by id order by ((case when c2 is null then null else crt_time end) is null), (case when c2 is null then null else crt_time end) desc) as c2, first_value(c3) over (partition by id order by ((case when c3 is null then null else crt_time end) is null), (case when c3 is null then null else crt_time end) desc) as c3, first_value(c4) over (partition by id order by ((case when c4 is null then null else crt_time end) is null), (case when c4 is null then null else crt_time end) desc) as c4, first_value(c5) over (partition by id order by ((case when c5 is null then null else crt_time end) is null), (case when c5 is null then null else crt_time end) desc) as c5, first_value(crt_time) over (partition by id order by crt_time desc) as crt_time from t_tmp ; INSERT 0 3628283 Time: 5208.968 ms
使用增量数据,提取并合并旧数据。
create table t_tmp2 (like t); insert into t_tmp2 select t_tmp.id, coalesce(t_tmp.c1, t.c1), coalesce(t_tmp.c2, t.c2), coalesce(t_tmp.c3, t.c3), coalesce(t_tmp.c4, t.c4), coalesce(t_tmp.c5, t.c5), coalesce(t_tmp.crt_time, t.crt_time) from t_tmp1 as t_tmp inner join t using (id); INSERT 0 3628282 Time: 9504.092 ms
删除旧数据。
delete from t using t_tmp2 where t.id=t_tmp2.id; DELETE 3628282 Time: 15356.920 ms
插入新增、以及合并的增量数据。
insert into t select t_tmp1.* from t_tmp1 left join t_tmp2 using (id) where t_tmp2.* is null union all select * from t_tmp2; INSERT 0 3628283 Time: 778.014 ms
数据校验
-- 中间结果 postgres=# select * from t_tmp where id=9; id | c1 | c2 | c3 | c4 | c5 | crt_time ----+----+----+----+----+----+---------------------------- 9 | 99 | | | | | 2017-12-13 23:18:14.65243 9 | 7 | | | | | 2017-12-13 23:18:14.817107 9 | | 9 | | | | 2017-12-13 23:18:15.292311 9 | | 56 | | | | 2017-12-13 23:18:15.449415 (4 rows) -- 中间结果 postgres=# select * from t_tmp where id=446; id | c1 | c2 | c3 | c4 | c5 | crt_time -----+----+----+----+----+----+---------------------------- 446 | 43 | | | | | 2017-12-13 23:18:14.291335 446 | 16 | | | | | 2017-12-13 23:18:14.715026 446 | 45 | | | | | 2017-12-13 23:18:15.048879 446 | | 34 | | | | 2017-12-13 23:18:15.646904 446 | | 7 | | | | 2017-12-13 23:18:15.81838 446 | | | 12 | | | 2017-12-13 23:18:16.220083 446 | | | 22 | | | 2017-12-13 23:18:16.26496 446 | | | | 97 | | 2017-12-13 23:18:17.464355 446 | | | | | 56 | 2017-12-13 23:18:18.427068 (9 rows) -- 使用窗口合并后结果 postgres=# select * from t_tmp1 limit 10; id | c1 | c2 | c3 | c4 | c5 | crt_time -----+----+----+----+----+----+---------------------------- 9 | 7 | 56 | | | | 2017-12-13 23:18:15.449415 -- 验证 25 | 69 | | 1 | | | 2017-12-13 23:18:16.161339 169 | 74 | 33 | 3 | | | 2017-12-13 23:18:16.71554 185 | 22 | | | | | 2017-12-13 23:18:14.93206 217 | 11 | 20 | 26 | | 59 | 2017-12-13 23:18:17.911174 270 | 55 | | 42 | | | 2017-12-13 23:18:16.494782 286 | 65 | 77 | 17 | | 75 | 2017-12-13 23:18:17.895121 430 | 12 | | 56 | | | 2017-12-13 23:18:16.744847 446 | 45 | 7 | 22 | 97 | 56 | 2017-12-13 23:18:18.427068 -- 验证 478 | 23 | 56 | | 25 | 77 | 2017-12-13 23:18:18.293153 (10 rows) -- 合并到全量表后结果 postgres=# select * from t where id=9; id | c1 | c2 | c3 | c4 | c5 | crt_time ----+----+----+----+----+-------+---------------------------- 9 | 7 | 56 | | | 10000 | 2017-12-13 23:18:15.449415 (1 row) postgres=# select * from t where id=446; id | c1 | c2 | c3 | c4 | c5 | crt_time -----+----+----+----+----+----+---------------------------- 446 | 45 | 7 | 22 | 97 | 56 | 2017-12-13 23:18:18.427068 (1 row)
4、合并总耗时:
35秒
5、耗时分布
增量数据1000万条,涉及500万个ID。
4.5秒
增量数据,窗口合并去重。
5.2秒
使用增量数据,提取并合并旧数据。
9.5秒
删除旧数据。
15秒
插入新增、以及合并的增量数据。
0.7秒
比较复杂、而且不支持新值使用NULL值(要支持的话,得修改一下覆盖逻辑)。
转载地址:http://ybxpo.baihongyu.com/