Thursday, December 19, 2013

Learning Pig

Pig provides an engine for executing data flows in parallel on Hadoop. It includes a
language, Pig Latin, for expressing these data flows.

-- Load input from the file named Mary, and call the single
-- field in the record 'line'.
input = load 'mary' as (line);
-- TOKENIZE splits the line into a field for each word.
-- flatten will take the collection of records returned by
-- TOKENIZE and produce a separate record for each one, calling the single
-- field in the record word.
words = foreach input generate flatten(TOKENIZE(line)) as word;
-- Now group them together by each word.
grpd = group words by word;
-- Count them.
cntd = foreach grpd generate group, COUNT(words);
-- Print out the results.
dump cntd;

Pig Latin, a Parallel Dataflow Language
Pig Latin is a dataflow language. This means it allows users to describe how data from
one or more inputs should be read, processed, and then stored to one or more outputs
in parallel.


Group then join in SQL
CREATE TEMP TABLE t1 AS
SELECT customer, sum(purchase) AS total_purchases
FROM transactions
GROUP BY customer;
SELECT customer, total_purchases, zipcode
FROM t1, customer_profile
WHERE t1.customer = customer_profile.customer;

-- Load the transactions file, group it by customer, and sum their total purchases
txns = load 'transactions' as (customer, purchase);
grouped = group txns by customer;
total = foreach grouped generate group, SUM(txns.purchase) as tp;
-- Load the customer_profile file
profile = load 'customer_profile' as (customer, zipcode);
-- join the grouped and summed transactions and customer_profile data
answer = join total by group, profile by customer;
-- Write the results to the screen
dump answer;

For example, because the number of records per key in a dataset is rarely evenly
distributed, the data sent to the reducers is often skewed. That is, one reducer will get
10 or more times the data than other reducers. Pig has join and order by operators that
will handle this case and (in some cases) rebalance the reducers.

find the five pages most visited by users between the ages of 18 and 25.
Users = load 'users' as (name, age);
Fltrd = filter Users by age >= 18 and age <= 25;
Pages = load 'pages' as (user, url);
Jnd = join Fltrd by name, Pages by user;
Grpd = group Jnd by url;
Smmd = foreach Grpd generate group, COUNT(Jnd) as clicks;
Srtd = order Smmd by clicks desc;
Top5 = limit Srtd 5;
store Top5 into 'top5sites';

Traditional extract transform load (ETL) data pipelines, research on raw data, and iterative processing.

Starting with version
0.7, it uses the Hadoop class LocalJobRunner that reads from the local filesystem and
executes MapReduce jobs locally.

dividends = load 'NYSE_dividends' as (exchange, symbol, date, dividend);
-- group rows together by stock ticker symbol
grouped = group dividends by symbol;
-- calculate the average dividend per symbol
avg = foreach grouped generate group, AVG(dividends.dividend);
-- store the results to average_dividend
store avg into 'average_dividend';

pig -x local average_dividend.pig
Because Hadoop is a distributed system and usually processes
data in parallel, when it outputs data to a “file” it creates a directory with the file’s
name, and each writer creates a separate part file in that directory

The only thing Pig needs to know to run on your cluster is the location of your cluster’s
NameNode and JobTracker. The NameNode is the manager of HDFS, and the Job-
Tracker coordinates MapReduce jobs. In Hadoop 0.18 and earlier, these locations are
found in your hadoop-site.xml file. In Hadoop 0.20 and later, they are in three separate
files: core-site.xml, hdfs-site.xml, and mapred-site.xml.

PIG_CLASSPATH=hadoop_conf_dir pig_path/bin/pig -e fs -mkdir /user/username
PIG_CLASSPATH=hadoop_conf_dir pig_path/bin/pig -e fs -copyFromLocal NYSE_dividends NYSE_dividends
PIG_CLASSPATH=hadoop_conf_dir pig_path/bin/pig average_dividend.pig

pig average_dividend.pig
pig -e cat average_dividend

export PIG_CLASSPATH=/etc/hadoop/conf.cloudera.yarn
-e or -execute
-h properties
-P or -propertyFile
Specify a property file that Pig should read

mapred.child.java.opts
pig -D exectype=local. When placed on the
command line, these property definitions must come before any Pig-specific commandline
options (such as -x local).
conf/pig.properties
you can specify a separate properties file
by using -P

Grunt: Pig’s interactive shell.
To enter Grunt, pig -x local
Pig will not start executing the Pig Latin
you enter until it sees either a store or dump.

HDFS Commands in Grunt
copyToLocal hdfsfile localfile
copyFromLocal localfile hdfsfile
sh. This command gives you access
to the local shell, just as fs gives you access to HDFS

Controlling Pig from Grunt
kill jobid
exec [[-param param_name = param_value]] [[-param_file filename]] script
Aliases defined in script are not imported into
Grunt.

run [[-param param_name = param_value]] [[-param_file filename]] script
Execute the Pig Latin script script in the current Grunt shell. Thus all aliases referenced
in script are available to Grunt, and the commands in script are accessible
via the shell history.

Scalar Types
Complex Types
maps, tuples, and bags

['name'#'bob', 'age'#55]
Tuple
A tuple is a fixed-length, ordered collection of Pig data elements.
('bob', 55)
Bag
A bag is an unordered collection of tuples.
{('bob', 55), ('sally', 52), ('john', 25)}

Schemas
dividends = load 'NYSE_dividends' as
(exchange:chararray, symbol:chararray, date:chararray, dividend:float);
the data type is assumed to be bytearray:
dividends = load 'NYSE_dividends' as (exchange, symbol, date, dividend);
as (a:chararray) as (a:bytearray)
as (a:map[], b:map[int])
as (a:tuple(), b:tuple(x:int, y:int))
as (a:bag{}, b:bag{t:(x:int, y:int)})

Load functions might already know the schema because
it is stored in a metadata repository such as HCatalog, or it might be stored in
the data itself (if, for example, the data is stored in JSON format).

mdata = load 'mydata' using HCatLoader();
daily = load 'NYSE_daily';
calcs = foreach daily generate $7 / 1000, $3 * 100.0, SUBSTRING($0, 0, 1), $6 - $3;

player = load 'baseball' as (name:chararray, team:chararray,
pos:bag{t:(p:chararray)}, bat:map[]);
unintended = foreach player generate (int)bat#'base_on_balls' - (int)bat#'ibbs';

Pig Latin
Each processing step results in a new data set, or relation.
Pig Latin also has field names. They name a field (or
column) in a relation

SQL-style single-line comments (--) and Java-style multiline comments (/* */).
load looks for your data on HDFS in a tab-delimited file
using the default load function PigStorage

divs = load 'NYSE_dividends' using HBaseStorage();
divs = load 'NYSE_dividends' using PigStorage(',');
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);

By default, Pig stores your data on HDFS in a tab-delimited file
using PigStorage:†
store processed into '/data/examples/processed';
store processed into 'processed' using
HBaseStorage();
store processed into 'processed' using PigStorage(',');

Dump
dump processed;

Relational Operations
They
allow you to transform it by sorting, grouping, joining, projecting, and filtering

it is Pig’s projection operator.
B = foreach A generate user, id;

*, ..
In addition to using names and positions, you can refer to all fields using * (asterisk),
which produces a tuple that contains all the fields. Beginning in version 0.9, you can
also refer to ranges of fields using .. (two periods). This is particularly useful when you
have many fields and do not want to repeat them all in your foreach command:

middle = foreach prices generate open..close; -- produces open, high, low, close
avg = foreach bball generate bat#'batting_average';
A = load 'input' as (t:tuple(x:int, y:int));
B = foreach A generate t.x, t.$1;

No comments:

Post a Comment