Hive - How to Further Optimize a Hiveql Query

Hive - Is there a way to further optimize a HiveQL query?

Filter by airport(inner join) and do aggregation before UNION ALL to reduce dataset passed to the final aggregation reducer. UNION ALL subqueries with joins should run in parallel and faster than join with bigger dataset after UNION ALL.

SELECT f.airport, SUM(cnt) AS Total_Flights
FROM (
SELECT a.airport, COUNT(*) as cnt
FROM flights_stats f
INNER JOIN airports a ON f.Origin=a.iata AND a.country='USA'
WHERE Cancelled = 0 AND Month IN (3,4)
GROUP BY a.airport
UNION ALL
SELECT a.airport, COUNT(*) as cnt
FROM flights_stats f
INNER JOIN airports a ON f.Dest=a.iata AND a.country='USA'
WHERE Cancelled = 0 AND Month IN (3,4)
GROUP BY a.airport
) f
GROUP BY f.airport
ORDER BY Total_Flights DESC
LIMIT 10
;

Tune mapjoins and enable parallel execution:

set hive.exec.parallel=true;
set hive.auto.convert.join=true; --this enables map-join
set hive.mapjoin.smalltable.filesize=25000000; --size of table to fit in memory

Use Tez and vectorizing, tune mappers and reducers parallelism: https://stackoverflow.com/a/48487306/2700344

Hive query optimization

you can use group by -> having count() for this requirement.

select distinct b.id
, b.name,
, b.email
, b.type
from table1 b
where id in
(select distinct id from table1 group by email, id having count(email) > 1)
and b.type=1
order by b.id

Optimizing Hive GROUP BY when rows are sorted

Create a bucketed sorted table. The optimizer will know it sorted from metadata.
See example here (official docs): https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables

Count only interaction = 1: count(case when interaction=1 then 1 end) as clicks - case will mark all rows with 1 or null and count only 1s.

Hive Query logic and Optimization

Just use conditional aggregation:

select id,
max(case when rank = 1 then col1 end) as col1,
max(case when rank = 2 then col1 end) as col2,
max(case when rank = 3 then col1 end) as col3
from t
where t1.rank in (1, 2, 3)
group by id;

The alternative is a multi-way join:

select t1.id, t1.col1, t2.col1 as col2, t3.col1 as col3
from t t1 left join
t t2
on t1.rank = 1 and t2.rank = 2 and t1.id = t2.id left join
t t3
on t1.id = t3.id and t3.rank = 3;

You may need to try both to see which runs faster. It could vary based on your data.

using cluster by to improve hive query performance

CLUSTER BY is basically a shortcut for DISTRIBUTE BY x SORT BY x, so it usually it does not send records to the same mapper as you say, but rather, on the same reducer. Also, usually, it does not speed up the query itself, but it is used to speed up the queries on the tables produced with it.

DISTRIBUTE BY/CLUSTER BY are used to as logical partitioning. While a traditional partition is saved in one directory, you can also partition by file, so for instance, when you DISTRIBUTE/CLUSTER BY in 256 buckets, it will distribute your records by a hash key in 256 files. Of course, this is useful only if your data has a cardinality much bigger than 256 and the key you use to cluster the data is more or less uniformely distributed, so the buckets are roughly the same size. Otherwise bucketing may do more harm than good.

Going back to your question, when is it useful? Similarly as for partitioning, when querying a table created with CLUSTERED BY x, when encountering a query that includes a clause WHERE x = 'myvalue' , the optimizer will recognize that the clause is on the bucketing key, and of the 256 buckets of the previous example, will only open the one where 'myvalue' is, dramatically reducing the amount of I/O (1/256 in this case).

Another case is joins, if you're joining two tables that are bucketed on the same field, the optimizer can do a 'bucket to bucket' join using o(n) mergesort, since the data is already sorted, instead of doing a shuffle join on the two tables, which is generally o(nlogn). On a very large dataset, that may mean hours instead of days.

Hive how to improve my query performance?

set hive.auto.convert.join=true; --this enables map-join
set hive.mapjoin.smalltable.filesize=25000000;
set hive.execution.engine=tez;

insert overwrite table zhihu.answer partition(ym)
select col1, col2 ... coln, ym, --list all columns
from
(
select col1, col2 ... coln, ym, --list all columns
row_number() over(partition by ym, answer_id, insert_time order by new_flag desc) rn
from
(
select col1, col2 ... coln, ym, --list all columns
0 as new_flag
from zhihu.answer t
where t.ym in (select distinct ym from zhihu.answer_increment)

UNION ALL

select col1, col2 ... coln, ym, --list all columns
1 as new_flag
from zhihu.answer_increment t
)s
)s
where s.rn=1;

Indexes were removed in Hive 3.0, more details in this Jira: HIVE-18448

See also this answer: https://stackoverflow.com/a/37744071/2700344

Also tune parallelism for better performance: https://stackoverflow.com/a/48487306/2700344

UPDATE: I studied plans provided by @DennisLi. Some observations:

  1. Join of big table with the whole increment one is performed as a map-join. In this case FULL join approach can be better than UNION ALL+row_number.

  2. join with partition list is already transformed by optimizer to LEFT SEMI JOIN (works as map join also), after filtering there are 4K rows out of total 70M. I recommend to calculate min and max increment ym partition separately and pass them as a parameters using WHERE ym>= ${min_increment_ym} and ym<=${max_increment_ym}
    In this case partition pruning will filter data efficiently without join. But it can be applied only if applicable to the increment dataset (increment contains single small range of partitions and we can use min and max efficiently) Implementing this will give you the maximum benefit

  3. Intermediate compression is not enabled. Enabling compression may give you a little, but it worth trying

Recommended approach:

set hive.auto.convert.join=true; --this enables map-join
set hive.mapjoin.smalltable.filesize=25000000;

--check compression influence separately.
--it may give some improvement depending on your data entropy
set hive.exec.compress.intermediate=true;
set mapred.output.compress=true;
set hive.exec.compress.output=true;

insert overwrite table zhihu.answer partition(ym)
select --select increment if exists, old if not exists
case when i.answer_id is not null then i.col1 else t.col1 end as col1,
... --for all columns
case when i.answer_id is not null then i.coln else t.coln end as coln,
--partition is the last one
case when i.answer_id is not null then i.ym else t.ym end as ym
from zhihu.answer t
full join zhihu.answer_increment i
on t.answer_id = i.answer_id
and t.insert_time = i.insert_time
and t.ym=i.ym --check this condition
where t.ym in (select distinct ym from zhihu.answer_increment) --try to implement min and max parameters instead of this if possible (see 2)
--alternatively if you do not want to employ shell, check if you can
--remove the WHERE condition providing ym in the join condition,
--this will allow to get rid of the second join in the plan,
--though partition pruning will work with parameters better
;

Finally the plan will be the most optimal one.

And you still may need to tune parallelism on mappers and reducers based on your execution logs, see previous answer recommendation



Related Topics



Leave a reply



Submit