How to handle incremental load for hive table

table_1 contains:

customer_id | items | price | updated_date------------+-------+-------+-------------10          | watch | 1000  | 2017062611          | bat   | 400   | 20170625

table_2 contains:

customer_id | items    | price | updated_date------------+----------+-------+-------------10          | computer | 20000 | 20170624

I want to update records of table_2 if customer_id already exists in it, if not, it should append to table_2.

As Hive 0.13 does not support update, I tried using join, but it fails.

There are 2 ways to handle this:

1st way: using row_number

2nd way: using Full outer join but hive 0.13 does not support update to we cant use this Full join

By Using row Number:

insert overwrite table_1 select customer_id, items, price, updated_datefrom(select customer_id, items, price, updated_date,       row_number() over(partition by customer_id order by new_flag desc) rnfrom     (     select customer_id, items, price, updated_date, 0 as new_flag       from table_1     union all     select customer_id, items, price, updated_date, 1 as new_flag       from table_2    ) all_data)s where rn=1;Also if you want to update all columns with new data, you can apply this solution with UNION ALL+row_number(), it works faster than full join:

2nd Way: using Full outer join

If you cannot update or merge in ACID mode then it’s possible to update using FULL OUTER JOIN  To find all entries that will be updated you can join increment data with old data:

insert overwrite target_data [partition() if applicable]SELECT  --select new if exists, old if not exists  case when i.PK is not null then i.PK   else t.PK   end as PK,  case when i.PK is not null then i.COL1 else t.COL1 end as COL1,  ...   case when i.PK is not null then i.COL_n else t.COL_n end as COL_n  FROM       target_data t --restrict partitions if applicable      FULL JOIN increment_data i on (t.PK=i.PK); 

It’s possible to optimize this by restricting partitions in target_data that will be overwritten and joined using WHERE partition_col in (select distinct partition_col from increment_data) or pass partition list if possible as a parameter and use in the where clause, it will work even faster.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)