Hadoop Definitive Guide - Relearn[9] Pig & Hive

Datetime:2016-08-22 21:52:16          Topic: Hive           Share

CH 16 Pig

Pig is made up of two parts:

  • Pig Latin, the language;
  • execution environment

    Pig is scripting language for exploring large datasets. One criticism of MapReduce is that the development cycle is very long.

It was created at Yahoo! to make it easier for researchers and engineers to mine the huge datasets there.

In fact, Pig development is not so quick.

Develop Stage:

  • writing mappers and reducers,
  • compiling and packaging the code,
  • submitting the job(s),
  • retrieving the results,
  • even with Streaming

local mode

pig -x local

MapReduce mode

In MapReduce mode, Pig translates queries into MR jobs and runs them on a Hadoop C\cluster.

To use MapReduce mode, you first need to check that the version of Pig you downloaded is compatible with the version of Hadoop you are using.

Pig honors the HADOOP_HOME environment variable for finding which Hadoop client to run.

Next, you need to point Pig at the cluster's namenode and resource manager.

Alternatively, you can set the properties in the pig.properties file in Pig's conf directory.

Running Pig Programs

mainly talk about Script - mode

Script: Pig can run a script file that contains Pig commands. For example, pig script.pig runs the commands in the local file script.pig. Alternatively, for very short scripts, you can use the -e option to run a script specified as a string on the command line.

An Example

-- max_temp.pig : Finds the maximum temperature by year

records = LOAD 'input/blah/input.txt' AS (year:chararray, temperature:int, quality: int);

filtered_records = FILTER records BY temperature != 9999 AND quality in (0, 1, 4, 5, 9);

grouped_records = GROUP filtered_records BY year;

max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);

DUMP max_temp;

Comparisons with Databases

It might seem that Pig Latin is similar to SQL. However, there are several differences between the two languages, and between Pig and relational database management systems (RDBMSs) in general.

The most significant difference is that Pig Latin is a data flow programming language, whereas SQL is a declarative programming language. Pig Latin program is a step-by-step of operations of an input relation, in which each step is a single transformation.

By contrast, SQL statements are a set of constraints that, taken together, define the output.

If you work with Pig, the following parts of Pig tutorial are worthy to read.

CH 17 Hive

Strength / Hive was created to make it possible for analysis with strong SQL skills (but meager Java programming skills) to run queries on the huge volumes of data that Facebook stored in HDFS).

Of course, SQL isn't ideal for every big data problem, such machine learning algorithms, but it's great for many analyses, and it has the huge advantage of being very well known in the industry.

Hive shell

hive -f script.q

hive -e 'SELECT * FROM dummy'

An Example

Let's see how to store tables in HDFS.

Just like an RDBMS, HIVE organizes its data into tables. We create a table to hold the weather data using the CREATE TABLE statement.

CREATE TABLE records (year STRING, temperature INT, quality, INT)
ROW FORMAT DELIMITED
     FIELDS TERMINATED BY '\t';

first line declares a records table with three columns.

LOAD DATA LOCAL INPATH 'input/blah/input-sample.txt'
OVERWRITE INTO TABLE records;

in this load-path case, there is only one file, input-sample.txt, OVERWRITE keyword in the LOAD DATA statement tells HIVE to delete any existing files in the directory for the table. If it is omitted, the new files are simply added to the table's directory( unless they have the same names, in which case they replace the old files).

Hive, SQL

SELECT year, MAX(temperature)
FROM records
WHERE temperature != 9999 AND quality IN (0, 1, 4, 5, 9)
GROUP BY year;

RUNNING Hive

hive --config /conf-file-directory

Hive also permits you to set properties on a per-session basis, by passing -hiveconf option

hive -hiveconf fs.defaultFS=hdfs://localhost \

-hiveconf mapreduce.framework.name=yarn \

-hiveconf yarn.resourcemanager

You can change settings from within a session, too, using the SET command.

SET hive.enforce.bucketing=true;

SET hive.execution.engine=tez;

Comparison with Traditional Databases

SCHEMA on read versus on write

SCHEMA on write:

In a traditional database, a table’s schema is enforced at data load time. If the data being loaded doesn’t conform to the schema, then it rejected. This design is sometimes called schema on write because the data is checked against the schema when it is written into the database.

SCHEMA on read:

Hive, on the other hand, doesn’t verify the data when it is loaded, but rather when a query is issued. This is called schema on read.

Schema on write makes query time performance faster because the database can index columns and perform compression on the data.

The trade-off, however, is that it takes longer to load data into the database.

SQL-on-Hadoop Alternatives

In the years since Hive was created, many other SQL-on-Hadoop engines emerged to address some of Hive’s limitations.

  • Cloudera Impala

Cloudera Impala was one of the first giving an order of magnitude performance boost compared to Hive running on MapReduce.

Impala uses a dedicated daemon that runs on each datanode in the cluster, which acts as a coordinator node for the query.

Impala uses the Hive metastore and supports Hive formats and most HiveQL constructs (plus SQL-92), so in practice it is straightforward to migrate between the two systems, or to run both on the same cluster.

Other prominent open source Hive alternatives:

  • Presto from Facebook
  • Apache Drill
  • Spark SQL

  • Apache Phoenix

    • takes a different approach entirely: it provides SQL on HBase.

HiveQL

HiveQL is a mixture of SQL-92, MySQL, and Oracle’s SQL dialect.

Hive documentation:

http://bit.ly/languagemannual

(This link may by invalid after years)

And, for me, it’s a waster of time to list the hive-base command functions.

Tables

Managed Tables and External Tables

When you create a table in Hive, by default Hive will manage the data, which means that Hive moves the data into its warehouse directory.

Alternatively, you may create an external table, which tells Hive to refer to the data that is at an existing location outside the warehouse directory.

The difference between two kinds of tables is seen in the LOAD and DROP semantics.

CREATE Table managed_tables ( dummy STRING);

LOAD DATA INPATH ‘/user/mike/test/data.txt’ INTO table managed_tables;

will move the file hdfs://user/hivewarehouse/managed_table.

If the table is later dropped, using

DROP TABLE managed_table;

the table, including its metadata and its data, is deleted.

An external table behaves differently. You control the creation and deletion of the data.

CREATE EXTERNAL TABLE external_table (dummy STRING)

LOCATION ‘/user/mike/test/external/external_table’;

LOAD DATA INPATH ‘/user/mike/test/data.txt’ INTO TABLE external_table;

When you drop this table, Hive will leave the data untouched, only delete the metadata.

Partitions and Buckets

Hive organizes tables into partitions — a way of dividing a table into coarse-grained parts based on the value of a partition column, such as a date.

Using partitions can make it faster to do queries on slices of the data.

Partitioned-table may be subdivided further into buckets to give extra structure to the data that may be used for more efficient queries.

  • Partitions

CREATE TABLE logs (ts BIGINT, line STRING)

PARTITIONED BY (dt STRING, country STRING);

We can ask Hive for the partitions in a table using SHOW PARTITIONS

SHOW PARTITIONS logs;

  • Buckets

There are two reasons why you might want to use buckets.

The first is to enable more efficient queries. Bucketing imposes extra structure on the table, which Hive can take advantage of when performing certain queries. In particular, a join of two tables that are bucketed on the same columns — which include the join columns — can be efficiently implemented as a map-side join.

The second reason to bucket a table is to make sampling more efficient.

CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS;

The data within a bucket may additionally be sorted by one or more columns. This allows even more efficient map-side joins, since the join of each bucket becomes an efficient merge sort.

CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) SORTED BY (id ASC)  INTO 4 BUCKETS;

To populate the bucketed table, we need to set the properties,

hive.encorce.bucketing

to true.

The first bucket contains the users with IDs 0, 4, 8, ….

the hash value is the 4’s multiples.

SELECT * FROM bucketed_users
TABLESAMPLE (BUCKET 1 OUT OF 4 ON id);

Bucket numbering is 1-based, so this query retrieves all the users from the first of four buckets.

Storage Formats

There are two dimensions that govern table storage in Hive: the row format and the file format.

The row format dictate how rows, and the fields in a particular row, are stored. In Hive parlance, the row format is defined by a SerDe, a portmanteau word for a Serializer-Deserializer.

deserialize: from the bytes in the file, to objects used internally by Hive to operate.

serializer: INSERT or CTAS, serialize Hive’s internal representation of a row of data into the bytes that are written to the output file.

The file formal dictates the container format for fields in a row. The simple format is a plain-text file, but there are row-oriented and column-oriented binary format available, too.

The default storage format: Delimited text.

When you create a table with no ROW FORMAT or STORED AS clauses, the default format is delimited text with one row per line.

The default row delimiter is not a tab, but the Ctrl-A (ASCII code 1, ^A).

The default collection item delimiter is a Ctrl-B character, used to delimit items in an ARRAY or STRUCT, or in key-value paris in a MAP.

The default map key delimiter is a Ctrl-C character, used to delimit the key and value in a MAP.

Rows in a table are delimited by a newline character.

CREATE TABLE ...
ROW FORMAT DELIMITED
     FIELDS TERMINATED BY ‘\001'
     COLLECTION ITEMS TERMINATED BY ‘\002'
     MAP KEYS TERMINATED BY ‘\003'
     LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE;

Internally, Hive uses a SerDe called LazySimpleSerDe for this delimited format, along with the line-oriented MapReduce text input and output formats.

Binary storage storage formats: Sequence files, Avro datafilee, Parquet files, RCFiles, and ORCFiles

Using a binary format is as simple as changing the STORED AS clause in the CREATE TABLEstatement. In this case, the ROW FORMAT is not specified, since the format is controlled by the underlying binary file format.

  • RegexSerDe

Let’s see how to use a custom SerDe for loading data. We’ll use a contrib SerDe that uses a regular expression for reading the fixed-width station metadata from a text file:

CREATE TABLE stations (usaf STRING, wban STRING, name STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "(\\d{6}) (\\d{5}) (.{29}) .*"
);

In previous examples, we have used the DELIMITED keyword to refer to delimited text in the ROW FORMAT clause. In this example, we instead specify a SerDe with the SERDE keyword and the fully qualified classname of the Java class that implements the SerDe, org.apache.hadoop.hive.contrib.serde2.RegexSerDe

Import Data

We’ve already seen how to use the LOAD DATAoperation to import data into a Hive table (or partition) by copying or moving files to the table’s directory. You can also populate a table with data from another Hive table using an INSERT statement, or at creation time using the CTAS construct, which is an abbreviation used to refer to CREATE TABLE...AS SELECT.

Here’s an example of an INSERT statement:

INSERT OVERWRITE TABLE target
SELECT col1, col2
  FROM source;

For partitioned tables, you can specify the partition to insert into by supplying a PARTITION clause:

INSERT OVERWRITE TABLE target
PARTITION (dt='2001-01-01')
SELECT col1, col2
  FROM source;

Multitable insert

In HiveQL, you can turn the INSERT statement around and start with the FROM clause for the same effect:

FROM source
INSERT OVERWRITE TABLE target
  SELECT col1, col2;

The reason for this syntax becomes clear when you see that it’s possible to have multiple INSERT clauses in the same query. This so-called multitable insert is more efficient than multiple INSERT statements because the source table needs to be scanned only once to produce the multiple disjoint outputs.

Here’s an example that computes various statistics over the weather dataset:

FROM records2
INSERT OVERWRITE TABLE stations_by_year
  SELECT year, COUNT(DISTINCT station)
  GROUP BY year
INSERT OVERWRITE TABLE records_by_year
  SELECT year, COUNT(1)
  GROUP BY year
INSERT OVERWRITE TABLE good_records_by_year
  SELECT year, COUNT(1)
  WHERE temperature != 9999 AND quality IN (0, 1, 4, 5, 9)
  GROUP BY year;

There is a single source table (records2), but three tables to hold the results from three different queries over the source.

Inserts

  • CREATE TABLE...AS SELECT

It’s often very convenient to store the output of a Hive query in a new table, perhaps because it is too large to be dumped to the console or because there are further processing steps to carry out on the result.

The new table’s column definitions are derived from the columns retrieved by the SELECTclause. In the following query, the target table has two columns named col1 and col2 whose types are the same as the ones in the sourcetable:

CREATE TABLE target
AS
SELECT col1, col2
FROM source;

A CTAS operation is atomic, so if the SELECTquery fails for some reason, the table is not created.

Altering Tables

Because Hive uses the schema-on-read approach, it’s flexible in permitting a table’s definition to change after the table has been created. The general caveat, however, is that in many cases, it is up to you to ensure that the data is changed to reflect the new structure.

You can rename a table using the ALTER TABLEstatement:

ALTER TABLE source RENAME TO target;

In addition to updating the table metadata, ALTER TABLE moves the underlying table directory so that it reflects the new name. In the current example, /user/hive/warehouse/sourceis renamed to /user/hive/warehouse/target. (An external table’s underlying directory is not moved; only the metadata is updated.)

Hive allows you to change the definition for columns, add new columns, or even replace all existing columns in a table with a new set.

For example, consider adding a new column:

ALTER TABLE target ADD COLUMNS (col3 STRING);

The new column col3 is added after the existing (nonpartition) columns. The datafiles are not updated, so queries will return null for all values of col3 (unless of course there were extra fields already present in the files). Because Hive does not permit updating existing records, you will need to arrange for the underlying files to be updated by another mechanism. For this reason, it is more common to create a new table that defines new columns and populates them using a SELECT statement.

Dropping Tables

The DROP TABLE statement deletes the data and metadata for a table. In the case of external tables, only the metadata is deleted; the data is left untouched.

If you want to delete all the data in a table but keep the table definition, use TRUNCATE TABLE. For example:

TRUNCATE TABLE my_table;

This doesn’t work for external tables; instead, use dfs -rmr (from the Hive shell) to remove the external table directory directly.

In a similar vein, if you want to create a new, empty table with the same schema as another table, then use the LIKE keyword:

CREATE TABLE new_table LIKE existing_table;

Sorting and Aggregating

Sorting data in Hive can be achieved by using a standard ORDER BY clause. ORDER BY performs a parallel total sort of the input (like that described in Total Sort). When a globally sorted result is not required—and in many cases it isn’t—you can use Hive’s nonstandard extension, SORT BY, instead. SORT BY produces a sorted file per reducer.

In some cases, you want to control which reducer a particular row goes to—typically so you can perform some subsequent aggregation. This is what Hive’s DISTRIBUTE BY clausedoes. Here’s an example to sort the weather dataset by year and temperature, in such a way as to ensure that all the rows for a given year end up in the same reducer partition:[116]

hive> FROM records2
    > SELECT year, temperature
    > DISTRIBUTE BY year
    > SORT BY year ASC, temperature DESC;
1949    111
1949    78
1950    22
1950    0
1950    -11

A follow-on query (or a query that nests this query as a subquery; see Subqueries) would be able to use the fact that each year’s temperatures were grouped and sorted (in descending order) in the same file.

If the columns for SORT BY and DISTRIBUTE BY are the same, you can use CLUSTER BY as a shorthand for specifying both.

MAPREDUCE SCRIPTS

Using an approach like Hadoop Streaming, the TRANSFORM, MAP, and REDUCE clauses make it possible to invoke an external script or program from Hive. Suppose we want to use a script to filter out rows that don’t meet some condition, such as the script in Example 17-1, which removes poor-quality readings.

Example 17-1. Python script to filter out poor-quality weather records

#!/usr/bin/env python

import re
import sys

for line in sys.stdin:
  (year, temp, q) = line.strip().split()
  if (temp != "9999" and re.match("[01459]", q)):
    print "%s\t%s" % (year, temp)

st_test:

#!/usr/bin/env python

import re
import sys

for line in sys.stdin:
  email = line.strip()
  if "staticor" in email:
    print "%s" % email

We can use the script as follows:

hive> ADD FILE /Users/tom/book-workspace/hadoop-book/ch17-hive/
src/main/python/is_good_quality.py;
hive> FROM records2
    > SELECT TRANSFORM(year, temperature, quality)
    > USING 'is_good_quality.py'
    > AS year, temperature;
1950    0
1950    22
1950    -11
1949    111
1949    78

Before running the query, we need to register the script with Hive. This is so Hive knows to ship the file to the Hadoop cluster (seeDistributed Cache).

Views

A view is sort of “virtual table” that is defined by a SELECT statement. Views can be used to present data to users in a way that differs from the way it is actually stored on disk.

In Hive, a view is not materialized to disk when it is created; hater, the view’s SELECT statement is executed when the statement that refers to the views is run. If a view performs extensive transformations on the base tables or is uses frequently, you may choose to manually materialize it by creating a new table that stores the contents of the view.

CREATE VIEW valid_records
AS
SELECT *
FROM records2
WHERE temperature != 9999 AND quality IN (0,1,4,5,9)

User-Defined Functions

UDFs, Hive make it easy to plug in your own processing code and invoke it from a Hive query.

UDFs have to be written in Java, the language that Hive itself is written in. For other languages, considering using SELECT TRANSFORM query.

three types of UDF in Hive

  • regular UDF
  • UDAFs User-Define Aggregate functions
  • UDTFs User-Define Table-generating functions




About List