Windowing Function in Hive

Windowing function in Hive

RANK() analytic function assigns a rank to each row in each partition in the dataset.

PARTITION BY clause determines how the rows to be distributed (between reducers if it is hive).

ORDER BY determines how the rows are being sorted in the partition.

First phase is distribute by, all rows in a dataset are distributed into partitions. In map-reduce each mapper groups rows according to the partition by and produces files for each partition. Mapper does initial sorting of partition parts according to the order by.

Second phase, all rows are sorted inside each partition.
In map-reduce, each reducer gets partitions files (parts of partitions) produced by mappers and sorts rows in the whole partition (sort of partial results) according to the order by.

Third, rank function assigns rank to each row in a partition. Rank function is being initialized for each partition.

For the first row in the partition rank starts with 1. For each next row Rank=previous row rank+1. Rows with equal values (specified in the order by) given the same rank, if the two rows share the same rank, next row rank is not consecutive.

Different partitions can be processed in parallel on different reducers. Small partitions can be processed on the same reducer. Rank function re-initializes when it crossing the partition boundary and starts with rank=1 for each partition.

Example (rows are already partitioned and sorted inside partitions):

SELECT a, RANK() OVER(partition by b order by c) as d from xyz; 

a, b, c, d(rank)
----------------
1 1 1 1 --starts with 1
2 1 1 1 --the same c value, the same rank=1
3 1 2 3 --rank 2 is skipped because second row shares the same rank as first

4 2 3 1 --New partition starts with 1
5 2 4 2
6 2 5 3

If you need consecutive ranks, use dense_rank function. dense_rank will produce rank=2 for the third row in the above dataset.

row_number function will assign a position number to each row in the partition starting with 1. Rows with equal values will receive different consecutive numbers.

SELECT a, ROW_NUMBER() OVER(partition by b order by c) as d from xyz; 

a, b, c, d(row_number)
----------------
1 1 1 1 --starts with 1
2 1 1 2 --the same c value, row number=2
3 1 2 3 --row position=3

4 2 3 1 --New partition starts with 1
5 2 4 2
6 2 5 3

Important note: For rows with the same values row_number or other such analytic function may have non-deterministic behavior and produce different numbers from run to run. First row in the above dataset may receive number 2 and second row may receive number 1 and vice-versa, because their order is not determined unless you will add one more column a to the order by clause. In this case all rows will always have the same row_number from run to run, their order values are different.

SQL (Hive): using window functions while aggregating with GROUP BY

SELECT DISTINCT
id,
first_value(a.location) OVER (PARTITION BY id ORDER BY updated_at DESC) AS latest_location,
first_value(a.direction) OVER (PARTITION BY id ORDER BY updated_at DESC) AS latest_direction,
count(*) OVER (PARTITION BY id) as total
FROM tmp

In your original query, max was basically a dummy aggregate as all the rows have the same value. And group by was essentially doing what the distinct does here.

In hive window, what would happen if the value of CURRENT ROW is smaller than that of UNBOUNDED PRECEDING

It works as designed and according to the standard, the same behavior is in other databases.

It is easier to find specification for Hive and other databases like Oracle than standard document (for free). For example see "Windowing Specifications in HQL" and Oracle "Window Function Frame Specification"

First the partition is ordered, then bounds are calculated and frame between bounds is used. Frame is taken according to the ORDER BY, not always >=bound1 and <=bound2.

For order by DESC Bound1>=Row.value>=Bound2. Frame includes rows from the partition start through the current row, including all peers of the current row (rows equal to the current row according to the ORDER BY clause).

For order by ASC Bound1<=Row.value<=Bound2.

UNBOUNDED PRECEDING:

The bound (bound1) is the first partition row (according to the order).

CURRENT ROW:

For ROWS, the bound (bound2) is the current row. For RANGE, the bound is the peers of the current row (rows with the same value as current row).

Also read this excellent explanation from Sybase:

The sort order of the ORDER BY values is a critical part of the test
for qualifying rows in a value-based frame; the numeric values alone
do not determine exclusion or inclusion

Use of window function and subquery in Hive

Calculate second_order_jan flag for cust_id and use it for filtering:

select
order_date
,cust_id
,nth_booking
,total_bookings
from
( --calculate second_order_jan flag for the cust_id
select cust_id,
order_date,
order_id,
nth_booking,
total_bookings,
max(case when month(order_date) = 1 and nth_booking=2 then 1 end) over (partition by cust_id) second_order_jan_flag
from
(
SELECT cust_id,
order_date
,order_id
,COUNT (*) OVER (PARTITION BY cust_id ORDER BY order_date) AS nth_booking
,COUNT (*) OVER (PARTITION BY cust_id) AS total_bookings
FROM my.orders
WHERE order_date BETWEEN '2017-01-01' AND '2017-01-31'
) t1
) t2 where second_order_jan_flag =1
and nth_booking >= 2 --Filter only orders after second.

HIVE - compute statistics over partitions with window based on date

If you can do an estimation of interval (1 month = 30 days): (an improvement of GMB's answer)

with t as (
select ID, Date,
sum(target) target,
count(target) c_target
from table
group by ID, Date
)
select ID, Date,
sum(target) over(
partition by ID
order by unix_timestamp(Date, 'yyyy-MM-dd')
range 60 * 60 * 24 * 90 preceding
) sum_3,
sum(c_target) over(
partition by ID
order by unix_timestamp(Date, 'yyyy-MM-dd')
range 60 * 60 * 24 * 90 preceding
) count_3,
sum(target) over(
partition by ID
order by unix_timestamp(Date, 'yyyy-MM-dd')
range 60 * 60 * 24 * 360 preceding
) sum_12,
sum(c_target) over(
partition by ID
order by unix_timestamp(Date, 'yyyy-MM-dd')
range 60 * 60 * 24 * 360 preceding
) count_12
from t

Or if you want exact intervals, you can do self joins (but expensive):

with t as (
select ID, Date,
sum(target) target,
count(target) c_target
from table
group by ID, Date
)
select
t_3month.ID,
t_3month.Date,
t_3month.sum_3,
t_3month.count_3,
sum(t3.target) sum_12,
sum(t3.c_target) count_12
from (
select
t1.ID,
t1.Date,
sum(t2.target) sum_3,
sum(t2.c_target) count_3
from t t1
left join t t2
on t2.Date > t1.Date - interval 3 month and
t2.Date <= t1.Date and
t1.ID = t2.ID
group by t1.ID, t1.Date
) t_3month
left join t t3
on t3.Date > t_3month.Date - interval 12 month and
t3.Date <= t_3month.Date and
t_3month.ID = t3.ID
group by t_3month.ID, t_3month.Date, t_3month.sum_3, t_3month.count_3
order by ID, Date;

Customised ordering in Hive's row_number() over partition by order by window function

Explicitly specify the order using case statement. You can use other scalar functions in the order by:

SELECT id, column_b, 
row_number() over(partition by id order by case column_b
when 'A' then '1'
when 'C' then '2'
when 'D' then '3'
when 'B' then '4'
--add more cases
--for example other values sort
--in natural order
else column_b
--or use constant
--to make sure
--everything else is greater than 4
--like this else concat('5',column_b)
end
) as row_id
FROM some_table

Also you can calculate order column in the subquery and use it in the window, it will work the same:

SELECT id, column_b, 
row_number() over(partition by id order by orderby) as row_id
FROM (select t.*,
case column_b
when 'A' then '1'
when 'C' then '2'
when 'D' then '3'
when 'B' then '4'
else concat('5',column_b)
end orderby
from some_table t
) s


Related Topics



Leave a reply



Submit