hive
CREATE TABLE x (a INT);
SELECT * FROM x;
DROP TABLE x;
Location to store table data
file:///user/hive/warehouse, for local mode,
and hdfs://namenode_server/user/hive/warehouse for the other modes
hive-default.xml.template
hive-site.xml
set hive.metastore.warehouse.dir=/user/myname/hive/warehouse;
$HOME/.hiverc file, which will be processed when Hive starts.
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://db1.mydomain.pvt/hive_db?createDatabaseIfNotExist=true</value>
The driver can be placed in the Hive library path,
$HIVE_HOME/lib. Some teams put all such support libraries in their Hadoop lib
directory.
--define key=value --hivevar key=value
Both let you define on the command line custom variables that you can reference
in Hive scripts to customize execution
Hive puts the key-value pair in the hivevar “namespace” to
distinguish these definitions from three other built-in namespaces, hiveconf, system,
and env.
set env:HOME;
hive --define foo=bar
set hivevar:foo=bar2;
create table toss1(i int, ${hivevar:foo} string);
describe toss1;
SELECT * FROM whatsit WHERE i = ${hiveconf:y};
YEAR=2012 hive -e "SELECT * FROM mytable WHERE year = ${env:YEAR}";
hive -e "SELECT * FROM mytable LIMIT 3";
hive -S -e "set" | grep warehouse
hive -f /path/to/file/withqueries.hql
If you are already inside the Hive shell you can use the SOURCE command to execute a
script file.
SELECT xpath(\'<a><b id="foo">b1</b><b id="bar">b2</b></a>\',\'//@id\') FROM src LIMIT 1;
hive -e "LOAD DATA LOCAL INPATH '/tmp/myfile' INTO TABLE src;
.hiverc
ADD JAR /path/to/custom_hive_extensions.jar;
set hive.cli.print.current.db=true;
set hive.exec.mode.local.auto=true;
Shell Execution
Simply
type ! followed by the command and terminate the line with a semicolon (;):
hive> !pwd
Hadoop dfs Commands from Inside Hive
hive> dfs -ls / ;
-- This is the best Hive script evar!!
set hive.cli.print.header=true;
Hadoop and Hive emphasize optimizing disk reading and writing performance
cast(s AS INT)
Collection Data Types: structs, maps, and arrays
struct('John', 'Doe') map('first', 'John','last', 'Doe') array('John', 'Doe')
STRUCTs can mix different types
in Big Data systems, a benefit of sacrificing normal form is higher processing
throughput. Scanning data off hard disks with minimal “head seeks” is essential when
processing terabytes to petabytes of data. Embedding collections in records makes retrieval
faster with minimal seeks. Navigating each foreign key relationship requires
seeking across the disk, with significant performance overhead.
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>);
Text File Encoding of Data Values
^A (“control” A) Separates all fields (columns). Written using the octal code \001 when explicitly
specified in CREATE TABLE statements.
^B Separate the elements in an ARRAY or STRUCT, or the key-value pairs in a MAP.
Written using the octal code \002 when explicitly specified in CREATE TABLE
statements.
^C Separate the key from the corresponding value in MAP key-value pairs. Written using
the octal code \003 when explicitly specified in CREATE TABLE statements.
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
Schema on Read
If you don’t specify a database, the default database is used.
CREATE DATABASE financials;
CREATE DATABASE IF NOT EXISTS financials;
CREATE DATABASE financials LOCATION '/my/preferred/directory' COMMENT 'Holds all financial tables';
CREATE DATABASE financials
WITH DBPROPERTIES ('creator' = 'Mark Moneybags', 'date' = '2012-01-02');
DESCRIBE DATABASE financials;
SHOW DATABASES;
SHOW DATABASES LIKE 'h.*';
Hive will create a directory for each database. Tables in that database will be stored in
subdirectories of the database directory. The exception is tables in the default database,
which doesn’t have its own directory.
For script portability, it’s typical to omit the authority, only specifying it when referring
to another distributed filesystem instance
USE financials;
SHOW TABLES;
set hive.cli.print.current.db=true;
DROP DATABASE IF EXISTS financials;
DROP DATABASE IF EXISTS financials CASCADE;
Using the RESTRICT keyword instead of CASCADE is equivalent to the default behavior,
where existing tables must be dropped before dropping the database.
ALTER DATABASE financials SET DBPROPERTIES ('edited-by' = 'Joe Dba');
CREATE TABLE IF NOT EXISTS mydb.employees (
name STRING COMMENT 'Employee name',
salary FLOAT COMMENT 'Employee salary',
subordinates ARRAY<STRING> COMMENT 'Names of subordinates',
deductions MAP<STRING, FLOAT>
COMMENT 'Keys are deductions names, values are percentages',
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
COMMENT 'Home address')
COMMENT 'Description of the table'
TBLPROPERTIES ('creator'='me', 'created_at'='2012-01-02 10:00:00', ...)
LOCATION '/user/hive/warehouse/mydb.db/employees';
CREATE TABLE IF NOT EXISTS mydb.employees2
LIKE mydb.employees;
SHOW TABLES IN mydb;
SHOW TABLES 'empl.*';
DESCRIBE EXTENDED mydb.employees;
DESCRIBE FORMATTED mydb.employees;
If you only want to see the schema for a particular column, append the column to the
table name.
DESCRIBE mydb.employees.salary;
Managed Tables
External Tables
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
exchange STRING,
symbol STRING,
ymd STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/stocks';
Partitioned, Managed Tables
it’s used for distributing load horizontally, moving data physically closer to its most
frequent users, and other purposes.
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING);
.../employees/country=CA/state=AB
SELECT * FROM employees
WHERE country = 'US' AND state = 'IL';
faster queries: partitioning can dramatically improve
query performance, but only if the partitioning scheme reflects common range
filtering (e.g., by locations, timestamp ranges).
set hive.mapred.mode=strict;
set hive.mapred.mode=nonstrict;
SHOW PARTITIONS employees;
SHOW PARTITIONS employees PARTITION(country='US');
SHOW PARTITIONS employees PARTITION(country='US', state='AK');
DESCRIBE EXTENDED employees;
You must specify a value for each partition column.
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
External Partitioned Tables
CREATE EXTERNAL TABLE IF NOT EXISTS log_messages (
hms INT,
severity STRING,
server STRING,
process_id INT,
message STRING)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
ALTER TABLE log_messages ADD PARTITION(year = 2012, month = 1, day = 2)
LOCATION 'hdfs://master_server/data/log_messages/2012/01/02';
hadoop distcp /data/log_messages/2011/12/02 s3n://ourbucket/logs/2011/12/02
ALTER TABLE log_messages PARTITION(year = 2011, month = 12, day = 2)
SET LOCATION 's3n://ourbucket/logs/2011/01/02';
hadoop fs -rmr /data/log_messages/2011/01/02
DESCRIBE EXTENDED log_messages PARTITION (year=2012, month=1, day=2);
Customizing Table Storage Formats
STORED AS TEXTFILE
SEQUENCEFILE and RCFILE, both of which optimize disk space usage and I/O
bandwidth performance using binary encoding and optional compression.
The record encoding is handled by an input format object: TextInputFormat
The record parsing is handled by a serializer/deserializer: hive.serde2.lazy.LazySimpleSerDe
there is also an output format that Hive uses for writing the
output of queries to files and to the console: hive.ql.io.HiveIgnoreKeyTextOutputFormat
CREATE TABLE kst
PARTITIONED BY (ds string)
ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe'
WITH SERDEPROPERTIES ('schema.url'='http://schema_provider/kst.avsc')
STORED AS
INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat'
OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat';
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
exchange STRING,
symbol STRING,
ymd STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
CLUSTERED BY (exchange, symbol)
SORTED BY (ymd ASC)
INTO 96 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/stocks';
ALTER TABLE log_messages RENAME TO logmsgs;
ALTER TABLE log_messages ADD IF NOT EXISTS
PARTITION (year = 2011, month = 1, day = 1) LOCATION '/logs/2011/01/01'
PARTITION (year = 2011, month = 1, day = 2) LOCATION '/logs/2011/01/02'
PARTITION (year = 2011, month = 1, day = 3) LOCATION '/logs/2011/01/03'
ALTER TABLE log_messages PARTITION(year = 2011, month = 12, day = 2)
SET LOCATION 's3n://ourbucket/logs/2011/01/02';
This command does not move the data from the old location, nor does it delete the old
data.
ALTER TABLE log_messages DROP IF EXISTS PARTITION(year = 2011, month = 12, day = 2);
ALTER TABLE log_messages
CHANGE COLUMN hms hours_minutes_seconds INT
COMMENT 'The hours, minutes, and seconds part of the timestamp'
AFTER severity;
we move the column after the severity column. If you want to move the column
to the first position, use FIRST instead of AFTER other_column
ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1)
SET FILEFORMAT SEQUENCEFILE;
Loading Data into Managed Tables
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
OVERWRITE INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
If the LOCAL keyword is used, the path is assumed to be in the local filesystem. The data
is copied into the final location. If LOCAL is omitted, the path is assumed to be in the
distributed filesystem. In this case, the data is moved from the path to the final location.
Hive does not verify that the data you are loading matches the schema for the table.
However, it will verify that the file format matches the table definition.
Inserting Data into Tables from Queries
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * FROM staged_employees se
WHERE se.cnty = 'US' AND se.st = 'OR';
FROM staged_employees se
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * WHERE se.cnty = 'US' AND se.st = 'OR'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'CA')
SELECT * WHERE se.cnty = 'US' AND se.st = 'CA'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'IL')
SELECT * WHERE se.cnty = 'US' AND se.st = 'IL';
Dynamic Partition Inserts
Hive also supports a dynamic partition feature,
where it can infer the partitions to create based on query parameters.
INSERT OVERWRITE TABLE employees
PARTITION (country, state)
SELECT ..., se.cnty, se.st
FROM staged_employees se;
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state)
SELECT ..., se.cnty, se.st
FROM staged_employees se
WHERE se.cnty = 'US';
hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> set hive.exec.max.dynamic.partitions.pernode=1000;
Creating Tables and Loading Them in One Query
CREATE TABLE ca_employees
AS SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
Exporting Data
hadoop fs -cp source_path target_path
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/ca_employees'
SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
hive> ! cat /tmp/payroll/000000_0
FROM staged_employees se
INSERT OVERWRITE DIRECTORY '/tmp/or_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'OR'
INSERT OVERWRITE DIRECTORY '/tmp/ca_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'CA'
INSERT OVERWRITE DIRECTORY '/tmp/il_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'IL'
HiveQL
SELECT name, subordinates[0] FROM employees;
SELECT name, deductions["State Taxes"] FROM employees;
SELECT name, address.city FROM employees;
Specify Columns with Regular Expressions
SELECT symbol, `price.*` FROM stocks;
Computing with Column Values
SELECT upper(name), salary, deductions["Federal Taxes"], round(salary * (1 - deductions["Federal Taxes"])) FROM employees;
Using Functions
Aggregate functions
SELECT count(*), avg(salary) FROM employees;
count(*),
count(expr) Return the number of rows for which the supplied
expression is not NULL.
count(DISTINCT expr[, expr_.])
sum(DISTINCT col)
SET hive.map.aggr=true;
This setting will attempt to do “top-level” aggregation in the map phase, as in this
example.
Table generating functions
take
single columns and expand them to multiple columns or rows.
SELECT explode(subordinates) AS sub FROM employees;
When using table generating
functions, column aliases are required by Hive.
SELECT parse_url_tuple(url, 'HOST', 'PATH', 'QUERY') as (host, path, query)
FROM url_table;
length,reverse, concat, test in(val1, val2, …), concat_ws, substr, upper, lower, trim, regexp_replace(s, regex,
replacement), regexp_extract(subject,
regex_pattern, index), parse_url(url, partname, key), size, cast(<expr> as <type>), from_unixtime, to_date, in_file(s, filename)
LIMIT Clause: LIMIT 2;
Column Aliases
SELECT upper(name), salary, deductions["Federal Taxes"] as fed_taxes,
round(salary * (1 - deductions["Federal Taxes"])) as salary_minus_fed_taxes
FROM employees LIMIT 2;
Nested SELECT Statements
FROM (
> SELECT upper(name), salary, deductions["Federal Taxes"] as fed_taxes,
> round(salary * (1 - deductions["Federal Taxes"])) as salary_minus_fed_taxes
> FROM employees
> ) e
> SELECT e.name, e.salary_minus_fed_taxes
> WHERE e.salary_minus_fed_taxes > 70000;
CASE … WHEN … THEN Statements
SELECT name, salary,
> CASE
> WHEN salary < 50000.0 THEN 'low'
WHEN salary >= 50000.0 AND salary < 70000.0 THEN 'middle'
> WHEN salary >= 70000.0 AND salary < 100000.0 THEN 'high'
> ELSE 'very high'
> END AS bracket FROM employees;
When Hive Can Avoid MapReduce
This even works for WHERE clauses that only filter on partition keys
set hive.exec.mode.local.auto=true;
A LIKE B
A RLIKE B, A REGEXP B
GROUP BY Clauses
The GROUP BY statement is often used in conjunction with aggregate functions to
group the result set by one or more columns and then perform an aggregation over each
group.
SELECT year(ymd), avg(price_close) FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd);
HAVING Clauses
The HAVING clause lets you constrain the groups produced by GROUP BY in a way that
could be expressed with a subquery, using a syntax that’s easier to express.
SELECT year(ymd), avg(price_close) FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd)
> HAVING avg(price_close) > 50.0;
Without the HAVING clause, this query would require a nested SELECT statement:
hive> SELECT s2.year, s2.avg FROM
> (SELECT year(ymd) AS year, avg(price_close) AS avg FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd)) s2
> WHERE s2.avg > 50.0;
Inner JOIN
In an inner JOIN, records are discarded unless join criteria finds matching records in
every table being joined.
SELECT a.ymd, a.price_close, b.price_close
> FROM stocks a JOIN stocks b ON a.ymd = b.ymd
> WHERE a.symbol = 'AAPL' AND b.symbol = 'IBM';
SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL';
SELECT a.ymd, a.price_close, b.price_close , c.price_close
> FROM stocks a JOIN stocks b ON a.ymd = b.ymd
> JOIN stocks c ON a.ymd = c.ymd
> WHERE a.symbol = 'AAPL' AND b.symbol = 'IBM' AND c.symbol = 'GE';
Join Optimizations
When joining three or more tables, if every ON clause uses the same join
key, a single MapReduce job will be used.
Hive also assumes that the last table in the query is the largest. It attempts to buffer the
other tables and then stream the last table through, while performing joins on individual
records. Therefore, you should structure your join queries so the largest table is last.
Hive also provides
a “hint” mechanism to tell the query optimizer which table should be streamed:
SELECT /*+ STREAMTABLE(s) */ s.ymd, s.symbol, s.price_close, d.dividend
FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
WHERE s.symbol = 'AAPL';
LEFT OUTER JOIN
all the records from the lefthand table that match the WHERE clause are
returned. If the righthand table doesn’t have a record that matches the ON criteria,
NULL is used for each column selected from the righthand table.
OUTER JOIN Gotcha
SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s LEFT OUTER JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL'
> AND s.exchange = 'NASDAQ' AND d.exchange = 'NASDAQ';
It
occurs because the JOIN clause is evaluated first, then the results are passed through
the WHERE clause. By the time the WHERE clause is reached, d.exchange is NULL most of the
time, so the “optimization” actually filters out all records except those on the day of
dividend payments.
use nested SELECT statements:
hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend FROM
> (SELECT * FROM stocks WHERE symbol = 'AAPL' AND exchange = 'NASDAQ') s
> LEFT OUTER JOIN
> (SELECT * FROM dividends WHERE symbol = 'AAPL' AND exchange = 'NASDAQ') d
> ON s.ymd = d.ymd;
The nested SELECT statement performs the required “push down” to apply the partition
filters before data is joined.
WHERE clauses are evaluated after joins are performed, so WHERE clauses
should use predicates that only filter on column values that won’t be
NULL. Also, contrary to Hive documentation, partition filters don’t work
in ON clauses for OUTER JOINS, although they do work for INNER JOINS!
RIGHT OUTER JOIN
Right-outer joins return all records in the righthand table that match the WHERE clause.
NULL is used for fields of missing records in the lefthand table.
FULL OUTER JOIN
Finally, a full-outer join returns all records from all tables that match the WHERE clause.
NULL is used for fields in missing records in either table.
LEFT SEMI-JOIN
A left semi-join returns records from the lefthand table if records are found in the righthand
table that satisfy the ON predicates. It’s a special, optimized case of the more general
inner join. Most SQL dialects support an IN ... EXISTS construct to do the same thing.
SELECT s.ymd, s.symbol, s.price_close
> FROM stocks s LEFT SEMI JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol;
Note that the SELECT and WHERE clauses can’t reference columns from the righthand
table.
For a given record in the lefthand table, Hive can stop looking for matching records in
the righthand table as soon as any match is found. At that point, the selected columns
from the lefthand table record can be projected.
Cartesian Product JOINs
A Cartesian product is a join where all the tuples in the left side of the join are paired
with all the tuples of the right table.
SELECTS * FROM stocks JOIN dividends;
When the property hive.mapred.mode is
set to strict, Hive prevents users from inadvertently issuing a Cartesian product query
Map-side Joins
set hive.auto.convert.join=true;
SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL';
ORDER BY and SORT BY
The ORDER BY clause is familiar from other SQL dialects. It performs a total ordering of
the query result set. This means that all the data is passed through a single reducer,
which may take an unacceptably long time to execute for larger data sets.
Hive adds an alternative, SORT BY, that orders the data only within each reducer, thereby
performing a local ordering, where each reducer’s output will be sorted. Better performance
is traded for total ordering.
ORDER BY s.ymd ASC, s.symbol DESC;
SORT BY s.ymd ASC, s.symbol DESC;
CREATE TABLE x (a INT);
SELECT * FROM x;
DROP TABLE x;
Location to store table data
file:///user/hive/warehouse, for local mode,
and hdfs://namenode_server/user/hive/warehouse for the other modes
hive-default.xml.template
hive-site.xml
set hive.metastore.warehouse.dir=/user/myname/hive/warehouse;
$HOME/.hiverc file, which will be processed when Hive starts.
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://db1.mydomain.pvt/hive_db?createDatabaseIfNotExist=true</value>
The driver can be placed in the Hive library path,
$HIVE_HOME/lib. Some teams put all such support libraries in their Hadoop lib
directory.
--define key=value --hivevar key=value
Both let you define on the command line custom variables that you can reference
in Hive scripts to customize execution
Hive puts the key-value pair in the hivevar “namespace” to
distinguish these definitions from three other built-in namespaces, hiveconf, system,
and env.
set env:HOME;
hive --define foo=bar
set hivevar:foo=bar2;
create table toss1(i int, ${hivevar:foo} string);
describe toss1;
SELECT * FROM whatsit WHERE i = ${hiveconf:y};
YEAR=2012 hive -e "SELECT * FROM mytable WHERE year = ${env:YEAR}";
hive -e "SELECT * FROM mytable LIMIT 3";
hive -S -e "set" | grep warehouse
hive -f /path/to/file/withqueries.hql
If you are already inside the Hive shell you can use the SOURCE command to execute a
script file.
SELECT xpath(\'<a><b id="foo">b1</b><b id="bar">b2</b></a>\',\'//@id\') FROM src LIMIT 1;
hive -e "LOAD DATA LOCAL INPATH '/tmp/myfile' INTO TABLE src;
.hiverc
ADD JAR /path/to/custom_hive_extensions.jar;
set hive.cli.print.current.db=true;
set hive.exec.mode.local.auto=true;
Shell Execution
Simply
type ! followed by the command and terminate the line with a semicolon (;):
hive> !pwd
Hadoop dfs Commands from Inside Hive
hive> dfs -ls / ;
-- This is the best Hive script evar!!
set hive.cli.print.header=true;
Hadoop and Hive emphasize optimizing disk reading and writing performance
cast(s AS INT)
Collection Data Types: structs, maps, and arrays
struct('John', 'Doe') map('first', 'John','last', 'Doe') array('John', 'Doe')
STRUCTs can mix different types
in Big Data systems, a benefit of sacrificing normal form is higher processing
throughput. Scanning data off hard disks with minimal “head seeks” is essential when
processing terabytes to petabytes of data. Embedding collections in records makes retrieval
faster with minimal seeks. Navigating each foreign key relationship requires
seeking across the disk, with significant performance overhead.
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>);
Text File Encoding of Data Values
^A (“control” A) Separates all fields (columns). Written using the octal code \001 when explicitly
specified in CREATE TABLE statements.
^B Separate the elements in an ARRAY or STRUCT, or the key-value pairs in a MAP.
Written using the octal code \002 when explicitly specified in CREATE TABLE
statements.
^C Separate the key from the corresponding value in MAP key-value pairs. Written using
the octal code \003 when explicitly specified in CREATE TABLE statements.
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
Schema on Read
If you don’t specify a database, the default database is used.
CREATE DATABASE financials;
CREATE DATABASE IF NOT EXISTS financials;
CREATE DATABASE financials LOCATION '/my/preferred/directory' COMMENT 'Holds all financial tables';
CREATE DATABASE financials
WITH DBPROPERTIES ('creator' = 'Mark Moneybags', 'date' = '2012-01-02');
DESCRIBE DATABASE financials;
SHOW DATABASES;
SHOW DATABASES LIKE 'h.*';
Hive will create a directory for each database. Tables in that database will be stored in
subdirectories of the database directory. The exception is tables in the default database,
which doesn’t have its own directory.
For script portability, it’s typical to omit the authority, only specifying it when referring
to another distributed filesystem instance
USE financials;
SHOW TABLES;
set hive.cli.print.current.db=true;
DROP DATABASE IF EXISTS financials;
DROP DATABASE IF EXISTS financials CASCADE;
Using the RESTRICT keyword instead of CASCADE is equivalent to the default behavior,
where existing tables must be dropped before dropping the database.
ALTER DATABASE financials SET DBPROPERTIES ('edited-by' = 'Joe Dba');
CREATE TABLE IF NOT EXISTS mydb.employees (
name STRING COMMENT 'Employee name',
salary FLOAT COMMENT 'Employee salary',
subordinates ARRAY<STRING> COMMENT 'Names of subordinates',
deductions MAP<STRING, FLOAT>
COMMENT 'Keys are deductions names, values are percentages',
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
COMMENT 'Home address')
COMMENT 'Description of the table'
TBLPROPERTIES ('creator'='me', 'created_at'='2012-01-02 10:00:00', ...)
LOCATION '/user/hive/warehouse/mydb.db/employees';
CREATE TABLE IF NOT EXISTS mydb.employees2
LIKE mydb.employees;
SHOW TABLES IN mydb;
SHOW TABLES 'empl.*';
DESCRIBE EXTENDED mydb.employees;
DESCRIBE FORMATTED mydb.employees;
If you only want to see the schema for a particular column, append the column to the
table name.
DESCRIBE mydb.employees.salary;
Managed Tables
External Tables
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
exchange STRING,
symbol STRING,
ymd STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/stocks';
Partitioned, Managed Tables
it’s used for distributing load horizontally, moving data physically closer to its most
frequent users, and other purposes.
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING);
.../employees/country=CA/state=AB
SELECT * FROM employees
WHERE country = 'US' AND state = 'IL';
faster queries: partitioning can dramatically improve
query performance, but only if the partitioning scheme reflects common range
filtering (e.g., by locations, timestamp ranges).
set hive.mapred.mode=strict;
set hive.mapred.mode=nonstrict;
SHOW PARTITIONS employees;
SHOW PARTITIONS employees PARTITION(country='US');
SHOW PARTITIONS employees PARTITION(country='US', state='AK');
DESCRIBE EXTENDED employees;
You must specify a value for each partition column.
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
External Partitioned Tables
CREATE EXTERNAL TABLE IF NOT EXISTS log_messages (
hms INT,
severity STRING,
server STRING,
process_id INT,
message STRING)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
ALTER TABLE log_messages ADD PARTITION(year = 2012, month = 1, day = 2)
LOCATION 'hdfs://master_server/data/log_messages/2012/01/02';
hadoop distcp /data/log_messages/2011/12/02 s3n://ourbucket/logs/2011/12/02
ALTER TABLE log_messages PARTITION(year = 2011, month = 12, day = 2)
SET LOCATION 's3n://ourbucket/logs/2011/01/02';
hadoop fs -rmr /data/log_messages/2011/01/02
DESCRIBE EXTENDED log_messages PARTITION (year=2012, month=1, day=2);
Customizing Table Storage Formats
STORED AS TEXTFILE
SEQUENCEFILE and RCFILE, both of which optimize disk space usage and I/O
bandwidth performance using binary encoding and optional compression.
The record encoding is handled by an input format object: TextInputFormat
The record parsing is handled by a serializer/deserializer: hive.serde2.lazy.LazySimpleSerDe
there is also an output format that Hive uses for writing the
output of queries to files and to the console: hive.ql.io.HiveIgnoreKeyTextOutputFormat
CREATE TABLE kst
PARTITIONED BY (ds string)
ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe'
WITH SERDEPROPERTIES ('schema.url'='http://schema_provider/kst.avsc')
STORED AS
INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat'
OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat';
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
exchange STRING,
symbol STRING,
ymd STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
CLUSTERED BY (exchange, symbol)
SORTED BY (ymd ASC)
INTO 96 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/stocks';
ALTER TABLE log_messages RENAME TO logmsgs;
ALTER TABLE log_messages ADD IF NOT EXISTS
PARTITION (year = 2011, month = 1, day = 1) LOCATION '/logs/2011/01/01'
PARTITION (year = 2011, month = 1, day = 2) LOCATION '/logs/2011/01/02'
PARTITION (year = 2011, month = 1, day = 3) LOCATION '/logs/2011/01/03'
ALTER TABLE log_messages PARTITION(year = 2011, month = 12, day = 2)
SET LOCATION 's3n://ourbucket/logs/2011/01/02';
This command does not move the data from the old location, nor does it delete the old
data.
ALTER TABLE log_messages DROP IF EXISTS PARTITION(year = 2011, month = 12, day = 2);
ALTER TABLE log_messages
CHANGE COLUMN hms hours_minutes_seconds INT
COMMENT 'The hours, minutes, and seconds part of the timestamp'
AFTER severity;
we move the column after the severity column. If you want to move the column
to the first position, use FIRST instead of AFTER other_column
ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1)
SET FILEFORMAT SEQUENCEFILE;
Loading Data into Managed Tables
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
OVERWRITE INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
If the LOCAL keyword is used, the path is assumed to be in the local filesystem. The data
is copied into the final location. If LOCAL is omitted, the path is assumed to be in the
distributed filesystem. In this case, the data is moved from the path to the final location.
Hive does not verify that the data you are loading matches the schema for the table.
However, it will verify that the file format matches the table definition.
Inserting Data into Tables from Queries
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * FROM staged_employees se
WHERE se.cnty = 'US' AND se.st = 'OR';
FROM staged_employees se
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * WHERE se.cnty = 'US' AND se.st = 'OR'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'CA')
SELECT * WHERE se.cnty = 'US' AND se.st = 'CA'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'IL')
SELECT * WHERE se.cnty = 'US' AND se.st = 'IL';
Dynamic Partition Inserts
Hive also supports a dynamic partition feature,
where it can infer the partitions to create based on query parameters.
INSERT OVERWRITE TABLE employees
PARTITION (country, state)
SELECT ..., se.cnty, se.st
FROM staged_employees se;
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state)
SELECT ..., se.cnty, se.st
FROM staged_employees se
WHERE se.cnty = 'US';
hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> set hive.exec.max.dynamic.partitions.pernode=1000;
Creating Tables and Loading Them in One Query
CREATE TABLE ca_employees
AS SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
Exporting Data
hadoop fs -cp source_path target_path
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/ca_employees'
SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
hive> ! cat /tmp/payroll/000000_0
FROM staged_employees se
INSERT OVERWRITE DIRECTORY '/tmp/or_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'OR'
INSERT OVERWRITE DIRECTORY '/tmp/ca_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'CA'
INSERT OVERWRITE DIRECTORY '/tmp/il_employees'
SELECT * WHERE se.cty = 'US' and se.st = 'IL'
HiveQL
SELECT name, subordinates[0] FROM employees;
SELECT name, deductions["State Taxes"] FROM employees;
SELECT name, address.city FROM employees;
Specify Columns with Regular Expressions
SELECT symbol, `price.*` FROM stocks;
Computing with Column Values
SELECT upper(name), salary, deductions["Federal Taxes"], round(salary * (1 - deductions["Federal Taxes"])) FROM employees;
Using Functions
Aggregate functions
SELECT count(*), avg(salary) FROM employees;
count(*),
count(expr) Return the number of rows for which the supplied
expression is not NULL.
count(DISTINCT expr[, expr_.])
sum(DISTINCT col)
SET hive.map.aggr=true;
This setting will attempt to do “top-level” aggregation in the map phase, as in this
example.
Table generating functions
take
single columns and expand them to multiple columns or rows.
SELECT explode(subordinates) AS sub FROM employees;
When using table generating
functions, column aliases are required by Hive.
SELECT parse_url_tuple(url, 'HOST', 'PATH', 'QUERY') as (host, path, query)
FROM url_table;
length,reverse, concat, test in(val1, val2, …), concat_ws, substr, upper, lower, trim, regexp_replace(s, regex,
replacement), regexp_extract(subject,
regex_pattern, index), parse_url(url, partname, key), size, cast(<expr> as <type>), from_unixtime, to_date, in_file(s, filename)
LIMIT Clause: LIMIT 2;
Column Aliases
SELECT upper(name), salary, deductions["Federal Taxes"] as fed_taxes,
round(salary * (1 - deductions["Federal Taxes"])) as salary_minus_fed_taxes
FROM employees LIMIT 2;
Nested SELECT Statements
FROM (
> SELECT upper(name), salary, deductions["Federal Taxes"] as fed_taxes,
> round(salary * (1 - deductions["Federal Taxes"])) as salary_minus_fed_taxes
> FROM employees
> ) e
> SELECT e.name, e.salary_minus_fed_taxes
> WHERE e.salary_minus_fed_taxes > 70000;
CASE … WHEN … THEN Statements
SELECT name, salary,
> CASE
> WHEN salary < 50000.0 THEN 'low'
WHEN salary >= 50000.0 AND salary < 70000.0 THEN 'middle'
> WHEN salary >= 70000.0 AND salary < 100000.0 THEN 'high'
> ELSE 'very high'
> END AS bracket FROM employees;
When Hive Can Avoid MapReduce
This even works for WHERE clauses that only filter on partition keys
set hive.exec.mode.local.auto=true;
A LIKE B
A RLIKE B, A REGEXP B
GROUP BY Clauses
The GROUP BY statement is often used in conjunction with aggregate functions to
group the result set by one or more columns and then perform an aggregation over each
group.
SELECT year(ymd), avg(price_close) FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd);
HAVING Clauses
The HAVING clause lets you constrain the groups produced by GROUP BY in a way that
could be expressed with a subquery, using a syntax that’s easier to express.
SELECT year(ymd), avg(price_close) FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd)
> HAVING avg(price_close) > 50.0;
Without the HAVING clause, this query would require a nested SELECT statement:
hive> SELECT s2.year, s2.avg FROM
> (SELECT year(ymd) AS year, avg(price_close) AS avg FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd)) s2
> WHERE s2.avg > 50.0;
Inner JOIN
In an inner JOIN, records are discarded unless join criteria finds matching records in
every table being joined.
SELECT a.ymd, a.price_close, b.price_close
> FROM stocks a JOIN stocks b ON a.ymd = b.ymd
> WHERE a.symbol = 'AAPL' AND b.symbol = 'IBM';
SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL';
SELECT a.ymd, a.price_close, b.price_close , c.price_close
> FROM stocks a JOIN stocks b ON a.ymd = b.ymd
> JOIN stocks c ON a.ymd = c.ymd
> WHERE a.symbol = 'AAPL' AND b.symbol = 'IBM' AND c.symbol = 'GE';
Join Optimizations
When joining three or more tables, if every ON clause uses the same join
key, a single MapReduce job will be used.
Hive also assumes that the last table in the query is the largest. It attempts to buffer the
other tables and then stream the last table through, while performing joins on individual
records. Therefore, you should structure your join queries so the largest table is last.
Hive also provides
a “hint” mechanism to tell the query optimizer which table should be streamed:
SELECT /*+ STREAMTABLE(s) */ s.ymd, s.symbol, s.price_close, d.dividend
FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
WHERE s.symbol = 'AAPL';
LEFT OUTER JOIN
all the records from the lefthand table that match the WHERE clause are
returned. If the righthand table doesn’t have a record that matches the ON criteria,
NULL is used for each column selected from the righthand table.
OUTER JOIN Gotcha
SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s LEFT OUTER JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL'
> AND s.exchange = 'NASDAQ' AND d.exchange = 'NASDAQ';
It
occurs because the JOIN clause is evaluated first, then the results are passed through
the WHERE clause. By the time the WHERE clause is reached, d.exchange is NULL most of the
time, so the “optimization” actually filters out all records except those on the day of
dividend payments.
use nested SELECT statements:
hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend FROM
> (SELECT * FROM stocks WHERE symbol = 'AAPL' AND exchange = 'NASDAQ') s
> LEFT OUTER JOIN
> (SELECT * FROM dividends WHERE symbol = 'AAPL' AND exchange = 'NASDAQ') d
> ON s.ymd = d.ymd;
The nested SELECT statement performs the required “push down” to apply the partition
filters before data is joined.
WHERE clauses are evaluated after joins are performed, so WHERE clauses
should use predicates that only filter on column values that won’t be
NULL. Also, contrary to Hive documentation, partition filters don’t work
in ON clauses for OUTER JOINS, although they do work for INNER JOINS!
RIGHT OUTER JOIN
Right-outer joins return all records in the righthand table that match the WHERE clause.
NULL is used for fields of missing records in the lefthand table.
FULL OUTER JOIN
Finally, a full-outer join returns all records from all tables that match the WHERE clause.
NULL is used for fields in missing records in either table.
LEFT SEMI-JOIN
A left semi-join returns records from the lefthand table if records are found in the righthand
table that satisfy the ON predicates. It’s a special, optimized case of the more general
inner join. Most SQL dialects support an IN ... EXISTS construct to do the same thing.
SELECT s.ymd, s.symbol, s.price_close
> FROM stocks s LEFT SEMI JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol;
Note that the SELECT and WHERE clauses can’t reference columns from the righthand
table.
For a given record in the lefthand table, Hive can stop looking for matching records in
the righthand table as soon as any match is found. At that point, the selected columns
from the lefthand table record can be projected.
Cartesian Product JOINs
A Cartesian product is a join where all the tuples in the left side of the join are paired
with all the tuples of the right table.
SELECTS * FROM stocks JOIN dividends;
When the property hive.mapred.mode is
set to strict, Hive prevents users from inadvertently issuing a Cartesian product query
Map-side Joins
set hive.auto.convert.join=true;
SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL';
ORDER BY and SORT BY
The ORDER BY clause is familiar from other SQL dialects. It performs a total ordering of
the query result set. This means that all the data is passed through a single reducer,
which may take an unacceptably long time to execute for larger data sets.
Hive adds an alternative, SORT BY, that orders the data only within each reducer, thereby
performing a local ordering, where each reducer’s output will be sorted. Better performance
is traded for total ordering.
ORDER BY s.ymd ASC, s.symbol DESC;
SORT BY s.ymd ASC, s.symbol DESC;
No comments:
Post a Comment