Pig

Pig

Introduction to Pig

The idea behind Pig was to provide an alternative language interface to programming MapReduce. Conceptually difficult to visualize a problem in MapReduce Need to know Java, Python, C++, etc. to program MapReduce jobs. Writing mappers and reducers manually can take a long time.

Pig is considered a MapReduce abstraction. The developer code in an interpreted data flow language which is then converted to a series of MapReduce jobs. The dataflow scripting language used by Pig is called Pig Latin.

Advantages of Pig

  • Much easier to develop Pig scripts than MapReduce Java or Python jobs.

  • Pig Latin lets you use SQL-like syntax to define your map and reduce steps.

  • Data structures are much richer, typically being multivalued and nested.

  • Transformations you can apply to the data are much more powerful•Highly extensible with user-defined functions (UDF’s).

Pig Latin

Pig Latin - Structure

  • A Pig Latin program consists of a collection of statements. A statement can be thought of as an operation or a command. Pig Latin statements work with relations. Relations are similar to a table in a relational database.

  • Statements are usually terminated with a semicolon.

  • Statements that have to be terminated with a semicolon can be split across multiple lines for readability.

  • Pig Latin has a list of keywords that cannot be used as identifiers, like operators (LOAD, ILLUSTRATE), commands (cat, ls), expressions (matches, FLATTEN), and functions (DIFF, MAX).

Pig Latin - Loading and Storing

metadata = LOAD 'ml-100k/u.item' USING PigStorage('|')
AS (movieID:int, movieTitle:chararray, releaseDate:chararray, 
videoRelease:chararray, imdbLink:chararray);

STORE metadata INTO 'store_example' USING PigStorage(';');
-- DUMP metadata;

Pig Latin - Schemas

A relation in Pig may have an associated schema, which gives the fields in the relation names and types. Pig is flexible with schema declaration, which contrasts with traditional SQL databases, where schemas are declared before the data is loaded. The schema is optional.

Type declarations are not mandatory. The types default to bytearray, the most general type, representing a binary string.

We don’t need to specify types for every field in the schema, but we do need to specify every field. We cannot specify the type of a field without specifying the name.

metadata1 = LOAD 'ml-100k/u.item' USING PigStorage('|')
AS (movieID:int, movieTitle:chararray, releaseDate:chararray, 
videoRelease:chararray, imdbLink:chararray);
DESCRIBE metadata1;

metadata2 = LOAD 'ml-100k/u.item' USING PigStorage('|')
AS (movieID:chararray, movieTitle:chararray, releaseDate:chararray, 
videoRelease:chararray, imdbLink:chararray);
DESCRIBE metadata2;
metadata3 = LOAD 'ml-100k/u.item' USING PigStorage('|')
AS (movieID, movieTitle:chararray, releaseDate:chararray, 
videoRelease:chararray, imdbLink:chararray);
DESCRIBE metadata3;

metadata4 = LOAD 'ml-100k/u.item' USING PigStorage('|');
DESCRIBE metadata4;
metadata4_bis = FOREACH metadata4 GENERATE $0, $1, $2;
DESCRIBE metadata4_bis;

Filtering Data

Once you have some data loaded into a relation, often the next step is to filter it to remove the data that you are not interested in.

By filtering early in the processing pipeline, you minimize the amount of data flowing through the system, which can improve efficiency.

metadata = LOAD 'ml-100k/u.item' USING PigStorage('|')
AS (movieID:int, movieTitle:chararray, releaseDate:chararray, 
videoRelease:chararray, imdbLink:chararray);

nameLookup = FOREACH metadata GENERATE movieID, movieTitle, 
ToUnixTime(ToDate(releaseDate, 'dd-MMM-yyyy')) AS releaseTime;
-- ToUnixTime converts DateTime to Unix Time Long

DUMP nameLookup;

Pig Latin - Types

Pig Latin allows “classic” simple types, such as int, chararray, Boolean, long, float, double, datetime, etc. Pig Latin also has three complex types for representing nested structures:

  • Tuple: Sequence of fields of any type, e.g. (1,' pomegranate')

  • Bag: Unordered collection of tuples, possibly with duplicates, e.g. {( 1,' pomegranate'),( 2)}

  • Map: Set of key-value pairs; keys must be character arrays, but values may be any type, e.g. ['a'# 'pomegranate']

A Pig relation is a bag of tuples. A Pig relation is similar to a table in a relational database, where the tuples in the bag correspond to the rows in a table. Pig provides the built-in functions TOTUPLE, TOBAG, and TOMAP to turn expressions into tuples, bags, and maps.

Grouping and Joining Data

Since large datasets are usually not normalized, joins are used less frequently in Pig than in SQL. Nevertheless, joins are easier in Pig than in MapReduce. Several join operations in Pig:

  • JOIN

  • GROUP

  • COGROUP

  • CROSS

Grouping and Joining Data: GROUP

The GROUP statement groups the data in a single relation. The result of a GROUP operation is a relation that includes one tuple per group. This tuple contains two fields:

  • The first field is named "group" (do not confuse this with the GROUP operator) and is the same type as the group key.

  • The second field takes the name of the original relation and is type bag.

-- GROUP example

ratings = LOAD 'ml-100k/u.data' 
AS (userID:int, movieID:int, rating:int, ratingTime:int);
ratingsByMovie = GROUP ratings BY movieID;

DESCRIBE ratingsByMovie;
-- ratingsByMovie:{
--    group: int, 
--    ratings: {
--       (userID: int,movieID: int,rating: int,ratingTime: int)
--    }
-- }

ILLUSTRATE ratingsByMovie;
-- JOIN example

ratings = LOAD 'ml-100k/u.data' 
AS (userID:int, movieID:int, rating:int, ratingTime:int);
metadata = LOAD 'ml-100k/u.item' USING PigStorage('|')
AS (movieID:int, movieTitle:chararray, releaseDate:chararray, 
videoRelease:chararray, imdbLink:chararray);

ratings_list = FOREACH ratings GENERATE movieID, rating;
nameLookup = FOREACH metadata GENERATE movieID, movieTitle; 
ratings_title = JOIN ratings_list BY movieID, nameLookup BY movieID;

DESCRIBE ratings_title;

-- ratings_title: 
--      {ratings_list::movieID: int, ratings_list::rating: int,
--       nameLookup::movieID: int, nameLookup::movieTitle: chararray
--      }

Use the disambiguate operator (::) to identify field names after JOIN operator.

ratings_dis = FOREACH ratings_title GENERATE ratings_list::movieID as movieID, 
  ratings_list::rating as rating, nameLookup::movieTitle as movieTitle;

DESCRIBE ratings_dis;

-- ratings_dis: 
--    {movieID: int, rating: int, movieTitle: chararray}

​Sorting data

ORDER operator to sort a relation by one or more fields.

fiveStarsWithData = JOIN fiveStarMovies BY movieID, nameLookup BY movieID;

oldestFiveStarMovies = ORDER fiveStarsWithData BY nameLookup::releaseTime;

LIMIT statement is useful for limiting the number of results as a quick-and-dirty way to get a sample of a relation.

ratings = LOAD 'ml-100k/u.data' 
AS (userID:int, movieID:int, rating:int, ratingTime:int);
ratings_limited = LIMIT ratings 5;
DUMP ratings_limited;

Pig Exercise 1

Get oldest five star movies

  1. Create a relation with u.data dataset with schema (userID:int, movieID:int, rating:int, ratingTime:int) and name it ratings_data

  2. Create a relation with u.item dataset with schema (movieID:int, movieTitle:chararray, releaseDate:chararray, videoRelease:chararray, imdbLink:chararray) and name it movies_data. Use the pipe " | " character with PigStorage to parse the file correctly.

  3. Use a FOREACH loop to create a new relation from movies_data and transform the release date to Unix time using: ToUnixTime(ToDate(releaseDate, 'dd-MMM-yyyy')) AS releaseTime

  4. Group ratings_data by movie ID.

  5. With relation created in 4, compute the average rating per movie.

  6. With relation created in 5, filter on average rating to keep movie with an average rating > 4.0.

  7. Join relation created in 3 and 6 (using movie ID).

  8. Order the relation created in 7 by release time (created in 3).

Pig Exercise 2

Find the most popular “bad” movie

  1. Create a relation with u.data dataset with schema (column names and datatype) and name it ratings_data.

  2. Create a relation with u.item dataset with schema (column names and datatype) and name it movies_data.

  3. Find all the movies with an average rating of less than 2.0

  4. Sort them by the total number of rating (you want the higher number first) to have the “most popular” movie.

  5. COUNT() lets you count up the number of item in a bag

  6. We want the movie title in the final relation, not just the movie ID

  7. AVG() and COUNT() can be used in the same FOREACH loop.

Last updated