Overview
Developed
by Facebook
HiveQL
is a SQL-like framework for data warehousing on top of MapReduce over
HDFS.
converts
SQL query into a series of jobs for execution on a Hadoop cluster.
Organizes
HDFS
data into tables - attaching structure.
Schema
on Read Versus Schema on Write - doesn’t
verify the data when it is loaded, but rather when a query is issued.
full-table
scans are the norm and a table update is achieved by transforming the
data into a new table.
HDFS
does not provide in-place file updates - changes from CUD ops are
stored in small delta files - periodically merged into base table
files by background MapReduce jobs
Hive
indexes - speed up queries. 2 index types: compact and bitmap.
Pluggable. Compact indexes store HDFS block numbers of each value,
rather than each file offset. Bitmap indexes use compressed bitsets
to efficiently store rows that a particular value appears in.
Metadata
(eg table schema, views) - stored in metastore DB
An
Example
create
table with 3 typed columns: each row in data file is tab-delimited
text.
CREATE TABLE tableX (age STRING, price INT, category INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
populate
Hive with the data.
LOAD DATA LOCAL INPATH 'path/to/blah.txt'
OVERWRITE INTO TABLE tableX;
OVERWRITE
keyword in LOAD DATA statement - delete any existing files in
the directory for the table, instead of appended
Now
that data is in Hive, can run a query against it:
SELECT age, MAX(price) FROM tableX
WHERE price != 1000 AND category IN (0, 1, 4)
GROUP BY age;
The Hive Shell
launch
Hive HiveQL shell:
%
hive
hive>
Commands
are terminated with a semicolon
SHOW TABLES;
noninteractive
mode -f run a script
hive -f script.q
-e
option - specify commands inline:
% echo 'X' > /tmp/blah.txt
% hive -e "CREATE TABLE blah (value STRING); \
LOAD DATA LOCAL INPATH '/tmp/blah.txt' \
OVERWRITE INTO TABLE blah"
suppress
messages using -S option:
% hive -S -e 'SELECT * FROM blah'
run
commands on the host OS - using a ! prefix to the command
access
HDFS using the dfs command.
Hive DataTypes
Primitive:
BOOLEAN TINYINT SMALLINT BIGINT FLOAT DOUBLE DECIMAL STRING
VARCHAR CHAR BINARY TIMESTAMP DATE Complex ARRAY STRUCT MAP UNION
Complex
- nesting. angled bracket notation:
CREATE TABLE complex (
x1 ARRAY<INT>,
x2 MAP<STRING, INT>,
x3 STRUCT<j:STRING, k:INT, l:DOUBLE>,
c4 UNIONTYPE<STRING, INT>
);
hive> SELECT x1[0], x2['b'], x3.j, x4 FROM complex;
Operators and Functions
retrieve
a list of functions: SHOW FUNCTIONS.
get
brief usage instructions for a particular function - DESCRIBE
FUNCTION
<cmd>
Conversions
- perform explicit type
conversion using CAST.
Tables
Composed
of stored data
(typically HDFS, can be
S3, local FS) +
associated schema
metadata (in Metastore
RDBMS)
Multiple
database/schema support - CREATE
DATABASE dbname, USE dbname, DROP DATABASE dbname.
Creating Tables
and Loading Data
CREATE TABLE tableX (blah STRING);
LOAD DATA INPATH '/user/joshr/data.txt' INTO TABLE tableX;
Tables
are stored as directories under Hive’s warehouse directory:
hive.metastore.warehouse.dir defaults to
hdfs:///user/hive/warehouse.
loaded
local files
placed in
warehouse directory. fs.defaultFS
default is
file:///
-
local filesystem
load
is very fast but does not validate schema mismatch – only checked
at query time: query returns NULL for a missing field.
Alternative:
external tables: outside warehouse directory:
CREATE EXTERNAL TABLE tableX (colX STRING)
LOCATION '/user/joshr/tableX';
LOAD DATA INPATH '/user/joshr/data.txt' INTO TABLE tableX;
can
create the data lazily after creating the table. DROP only deletes
metadata.
LOAD
DATA - import data into a Hive table (or partition) by copying or
moving files to the table’s directory.
populate
a table with data from another Hive table using INSERT or at
creation time using CREATE TABLE...AS SELECT.
to
import data from a relational database directly into Hive, use Sqoop
Altering Tables
Hive
flexible schema-on-read approach in permitting a table’s definition
to change after the table has been created.
rename
a table using ALTER TABLE: moves
underlying table directory so that it reflects the new name.
ALTER
TABLE source RENAME TO
target;
can
change definition for columns, add new columns, or replace all
existing columns:
ALTER
TABLE target ADD COLUMNS (colA
STRING);
The
new column colA is added after
the existing (nonpartition) columns. The datafiles are not updated,
so queries
will return null for all values of colA
Hive
does not permit updating existing tableX – need to update
underlying files by another mechanism → more efficient to create a
new table that defines new columns and populates them using a SELECT
statement.
Changing
a column’s metadata, such as a column’s name or data type, is
more straightforward, assuming that the old data type can be
interpreted as the new data type.
LIKE
- create a new, empty table with the same schema as another table:
CREATE TABLE tableY LIKE tableX;
Dropping Tables
DROP
TABLE - delete data + metadata. For external tables, only
metadata is deleted
TRUNCATE
TABLE - delete all the data in
a table but keep the table definition(
doesn’t work for external tables - use dfs -rmr from
the Hive shell)
TRUNCATE TABLE tableX;
Partitions and Buckets
Partition
columns subdivided into buckets. A table may be partitioned in
multiple dimensions. - efficient queries by location. Partitions are
defined at table creation time using PARTITIONED BY clause:
CREATE TABLE tableX (timestamp BIGINT, line STRING)
PARTITIONED BY (datetime STRING, category STRING);
When
loading data into a partitioned table, partition values are specified
explicitly:
LOAD DATA LOCAL INPATH 'path/to/file'
INTO TABLE tableX
PARTITION (datetime='2015-01-01', category='A');
At
the filesystem level, partitions are nested subdirectories of the
table directory:
/user/hive/warehouse/tableX
├── datetime=2015-01-01/
│ ├── category=A/
│ │ ├── file1
│
│ └── file2
│ └── category=B/
│ └── file3
└── datetime=2015-01-02/
├── category=A/
│ └── file4
└── category=B/
├── file5
└── file6
SHOW
PARTITIONS:
SHOW PARTITIONS logs;
datetime=2015-01-01/category=A
datetime=2015-01-01/category=B
datetime=2015-01-02/category=A
datetime=2015-01-02/category=B
use
partition columns in SELECT statements - Hive performs input
pruning to scan only relevant partitions - enable more efficient
queries. Bucketing imposes extra structure on table to support
map-side join, efficient sampling queries of dataset subset
specify
columns to bucket on and number of buckets:
CREATE TABLE tableX (id INT, name STRING)
CLUSTERED BY (id) INTO 5 BUCKETS;
Physically,
each bucket is just a file in the table (or partition) directory.
map-side
join: if two tables are bucketed in the same way need only retrieve
relevant bucket Sorting data within a bucket - becomes an efficient
merge sort.
CREATE TABLE tableX (id INT, name STRING)
CLUSTERED BY (id) SORTED BY (id ASC) INTO 5 BUCKETS;
sample
a table using TABLESAMPLE clause:
SELECT * FROM tableX
TABLESAMPLE(BUCKET 1 OUT OF 5 ON id);
Storage Formats
file
format - dictates the container format for fields in a row.
row
format - how rows, and fields in a row, are stored, defined by a
SerDe (Serializer- Deserializer)
When
querying / inserting a table, a SerDe will deserialize / serialize a
row of data between file bytes & Hive internal objects
The
simplest format is a plain-text file - row-oriented / column-oriented
binary formats have advantages over this.
Delimited
plain-text files (default)
a
table created with no ROW FORMAT or
STORED AS clauses, - default format is delimited text with one
row per line.
Default
delimeters:
-
row
^A -not tab char
-
collection
item - ^B - delimit items in an ARRAY or
STRUCT, or in key-value pairs in a MAP.
-
map
key - ^C - delimit key and value in a MAP.
-
Rows
in a table - newline char.
For
an array of arrays, delimiters for outer array are ^B chars, but for
inner array are ^C chars,
check
which delimiters are used for a particular nested structure:
CREATE TABLE tableX AS
SELECT array(array(1, 2), array(3, 4)) FROM tableY;
and
then use hexdump on output file.
Hive
supports 8 levels of delimiters, corresponding to ASCII codes 1-8
CREATE TABLE ...
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
octal
form of delimiter characters can be used — 001 for ^A.
Internally,
Hive uses a SerDe called LazySimpleSerDe for delimited
format + line-oriented MapReduce text input and output formats - not
compact , but simple.
Binary storage
formats
STORED
AS clause in
CREATE TABLE statement - ROW FORMAT is not specified,
since the format is controlled by the underlying binary file format.
Native support for SEQUENCE / AVRO / PARQUET / RCF / ORC
row-oriented
formats (Sequence, Avro ):
SET hive.exec.compress.output=true;
SET avro.output.codec=snappy;
CREATE TABLE tableX STORED AS AVRO;
column-oriented
formats (Parquet, RCFile, ORCFile)
CREATE TABLE tableX STORED AS PARQUET
AS SELECT * FROM tableY;
Storage handlers
Used
for storage systems that Hive cannot access natively, such as HBase.
specified using a
STORED BY
clause, instead of the ROW
FORMAT and
STORED AS
clauses.
Inserts
add
new rows in bulk to an existing table by using INSERT INTO to
add new data files to a table.
INSERT OVERWRITE TABLE target
SELECT colA, colB FROM source;
For
partitioned tables:
INSERT OVERWRITE TABLE target
PARTITION (datetime='2015-01-01')
SELECT colA, colB FROM source;
to
add tableX - use INSERT INTO TABLE.
specify
the partition dynamically: dynamic partition insert.
INSERT OVERWRITE TABLE target
PARTITION (datetime)
SELECT colA, colB, datetime FROM source;
Multitable
insert - invert INSERT statement with FROM clause as outer:
FROM source
INSERT OVERWRITE TABLE tableX
SELECT colA, colB
INSERT
OVERWRITE TABLE tableY
SELECT colC, colD;
possible
to have multiple INSERT clauses in same query. more efficient than
multiple INSERT statements because source table needs to be scanned
only once to produce multiple disjoint outputs.
CREATE
TABLE...AS SELECT
convenient
to store output of a Hive query in a new table --> further
processing steps
Sorting and
Aggregating
ORDER
BY -performs a parallel global sort
SORT
BY - produces a sorted file per reducer
DISTRIBUTE
BY - control which reducer a particular row goes to for some
subsequent aggregation
CLUSTER
BY - shorthand for specifying both If columns for SORT BY &
DISTRIBUTE BY are the same
FROM tableX SELECT age, price
DISTRIBUTE BY age SORT BY age ASC, price DESC;
MapReduce External Scripts
TRANSFORM,
MAP, and REDUCE
clauses - invoke an external script or program from Hive.
use
a script as follows:
ADD FILE /~/is_good_category.py; #register script with Hive
FROM tableX
SELECT TRANSFORM(age, price, category)
USING 'is_good_category.py' AS age, price;
register
the script with Hive -->
ships the file to the Hadoop cluster (Distributed
Cache).
input
fields are passed as a tab-separated string
MAP
and REDUCE keywords – alternate nested form for the query:
FROM (
FROM tableX
MAP age, price, category
USING 'is_good_category.py'
AS age, price) map_output
REDUCE age, price
USING 'max_price_reduce.py'
AS age, price;
Joins
SELECT tableX.*, tableY.* FROM tableX JOIN tableY ON (tableX.id = tableY.id);
Hive
only supports equijoins - only category can be used in the join
predicate
join
on multiple columns in the join predicate by specifying a series of
expressions, separated by AND keyword.
alternative
syntax: list join tables in FROM clause and specify join
condition in WHERE clause:
SELECT tableX.*, tableY.* FROM tableX, tableY WHERE tableX.id = tableY.id;
join
multiple tables by supplying additional JOIN...ON... clauses –
query optimizer: A single join is implemented as a single MapReduce
job, but multiple joins can be performed in less than one MapReduce
job per join if the same column is used in the join condition - see
how many MapReduce jobs Hive will use by prefixing query with
EXPLAIN keyword: query execution plan shows AST + stage
dependency graph. Verbose: EXPLAIN EXTENDED. rule-based query
optimizer replaced by cost-based optimizer
EXPLAIN SELECT tableX.*, tableY.* FROM tableX JOIN tableY ON (tableX.id = tableY.id);
Outer
joins - find
nonmatches as
NULLs: LEFT /
RIGHT / FULL
OUTER JOIN:
Semi
joins: Alternative syntax for WHERE x IN (subquery).
Note: subquery columns cannot appear in parent query SELECT.
SELECT * FROM tableY WHERE tableY.id IN (SELECT id from tableX);
// alternative syntax:
SELECT * FROM tableY LEFT SEMI JOIN tableX ON (tableX.id = tableY.id);
Map
Joins - Hive can load small tables in memory - take advantage of
bucketed tables:
SET
hive.optimize.bucketmapjoin=true;
User-Defined
Functions
plug
in processing code and invoke it from a Hive query.
UDFs
have to be written in Java, For other languages: use SELECT
TRANSFORM to stream data through a user-defined MapReduce script
3
types of UDF in Hive:
-
UDF
-operates on a single row→ single single row
-
UDAF
-aggregate : operates on multiple input rows → single output
row.
-
UDTF
-table: operates on a single row and produces multiple rows — a
table:
CREATE TABLE tableX (colA ARRAY<INT>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002';
the
explode UDTF emits a row for each entry in an array:
SELECT explode(colA) AS colB FROM tableX;
Configuration
hive-site.xml
override
with --config option in hive command:
% hive --config /Users/joshr/dev/hive-conf
set
properties on a per-session basis, by passing the -hiveconf
option to the hive command.
% hive -hiveconf fs.defaultFS=hdfs://localhost \
-hiveconf mapreduce.framework.name=yarn \
-hiveconf yarn.resourcemanager.address=localhost:8032
change
settings from within a session using SET command
hive>
SET
hive.enforce.bucketing=true;
SET
-v - list all
properties in the system
need
to explicitly enable transactions ,table / partition-level locking.
Locks
are managed transparently using ZooKeeper - SHOW LOCKS
Execution
engines - Hive was originally written to use MapReduce as its
execution engine, also possible to run Hive using
Tez, Spark (DAG) engines - avoid replication overhead by
writing intermediate output to memory. hive.execution.engine
property defaults to mr (for MapReduce):
SET hive.execution.engine=spark;
Logging
% hive -hiveconf hive.log.dir='/tmp/${user.name}'
%
hive -hiveconf hive.root.logger=DEBUG,console