Understanding Pig through the code

The following subsections have a brief description of the operators and their usage:

Pig's extensibility

In the use case example, the REGISTER function is one of the three ways to incorporate external custom code in Pig scripts. Let's quickly examine the other two Pig extensibility features in this section to get a better understanding.

  • REGISTER: The UDFs provide one avenue to include the user code. To use the UDF written in Java, Python, JRuby, or Groovy, we use the REGISTER function in the Pig script to register the container (JAR and Python script). To register a Python UDF, you also need to explicitly provide which compiler the Python script will be using. This can be done using Jython.

    In our example, the following line registers the Piggybank JAR:

    REGISTER '/opt/cloudera/parcels/CDH-4.3.0-1.cdh4.3.0.p0.22/lib/pig/piggybank.jar';
  • MAPREDUCE: This operator is used to embed MapReduce jobs in Pig scripts. We need to specify the MapReduce container JAR along with the inputs and outputs for the MapReduce program.

    An example is given as follows:

    input = LOAD 'input.txt';
    result = MAPREDUCE 'mapreduceprg.jar' [('other.jar', ...)] STORE input INTO 'inputPath' USING storeFunc LOAD 'outputPath' USING loadFunc AS schema ['params, ... '];

    The previous statement stores the relation named input into inputPath using storeFunc; native mapreduce uses storeFunc to read the data. The data received as a result of executing mapreduceprg.jar is loaded from outputPath into the relation named result using loadFunc as schema.

  • STREAM: This allows data to be sent to an external executable for processing as part of a Pig data processing pipeline. You can intermix relational operations, such as grouping and filtering with custom or legacy executables. This is especially useful in cases where the executable has all the custom code, and you may not want to change the code and rewrite it in Pig. The external executable receives its input from a standard input or file, and writes its output either to a standard output or file.

    The syntax for the operator is given as follows:

    alias = STREAM alias [, alias …] THROUGH {'command' | cmd_alias } [AS schema] ;

    Where alias is the name of the relation, THROUGH is the keyword, command is the executable along with arguments, cmd_alias is the alias defined for the command using the DEFINE operator, AS is a keyword, and schema specifies the schema.

Operators used in code

The following is an explanation of the operators used in the code:

  • DEFINE: The DEFINE statement is used to assign an alias to an external executable or a UDF function. Use this statement if you want to have a crisp name for a function that has a lengthy package name.

    For a STREAM command, DEFINE plays an important role to transfer the executable to the task nodes of the Hadoop cluster. This is accomplished using the SHIP clause of the DEFINE operator. This is not a part of our example and will be illustrated in later chapters.

    In our example, we define aliases by names ApacheCommonLogLoader, DayMonExtractor, and DayExtractor for the corresponding fully qualified class names.

    DEFINE ApacheCommonLogLoader org.apache.pig.piggybank.storage.apachelog.CommonLogLoader();
    
    DEFINE DayMonExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('dd/MMM/yyyy:HH:mm:ss Z','dd-MMM');
    
    DEFINE DayExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('dd-MMM','dd');
  • LOAD: This operator loads data from the file or directory. If a directory name is specified, it loads all the files in the directory into the relation. If Pig is run in the local mode, it searches for the directories on the local File System; while in the MapReduce mode, it searches for the files on HDFS. In our example, the usage is as follows:
    raw_logs_Jul = LOAD 'NASA_access_logs/Jul/access_log_Jul95' USING ApacheCommonLogLoader AS (jaddr, jlogname, juser, jdt, jmethod, juri, jproto, jstatus, jbytes);
    
    raw_logs_Aug = LOAD 'NASA_access_logs/Aug/access_log_Aug95' USING ApacheCommonLogLoader AS (aaddr, alogname, auser, adt, amethod, auri, aproto, astatus, abytes);

    The content of tuple raw_logs_Jul is as follows:

    (163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585)
    (163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/cgi-bin/imagemap/countdown70?287,288,HTTP/1.0,302,85)
    (109.172.181.143,-,-,02/Jul/1995:00:12:01 -0400,GET,/history/xxx/,HTTP/1.0,200,6245)

    By using globs (such as *.txt, *.csv, and so on), you can read multiple files (all the files or selective files) that are in the same directory. In the following example, the files under the folders Jul and Aug will be loaded as a union.

    raw_logs = LOAD 'NASA_access_logs/{Jul,Aug}' USING ApacheCommonLogLoader AS (addr, logname, user, dt, method, uri, proto, status, bytes);
  • STORE: The STORE operator has dual purposes, one is to write the results into the File System after completion of the data pipeline processing, and another is to actually commence the execution of the preceding Pig Latin statements. This happens to be an important feature of this language, where logical, physical, and MapReduce plans are created after the script encounters the STORE operator.

    In our example, the following code demonstrates their usage:

    DUMP limitd;
    STORE limitd INTO 'unique_hits_by_month';
  • DUMP: The DUMP operator is almost similar to the STORE operator, but it is used specially to display results on the command prompt rather than storing it in a File System like the STORE operator. DUMP behaves in exactly the same way as STORE, where the Pig Latin statements actually begin execution after encountering the DUMP operator. This operator is specifically targeted for the interactive execution of statements and viewing the output in real time.

    In our example, the following code demonstrates the usage of the DUMP operator:

    DUMP limitd;
  • UNION: The UNION operator merges the contents of more than one relation without preserving the order of tuples as the relations involved are treated as unordered bags.

    In our example, we will use UNION to merge the two relations raw_logs_Jul and raw_logs_Aug into a relation called combined_raw_logs.

    combined_raw_logs = UNION raw_logs_Jul, raw_logs_Aug;

    The content of tuple combined_raw_logs is as follows:

    (163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585)
    (163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/cgi-bin/imagemap/countdown70?287,288,HTTP/1.0,302,85)
    (198.4.83.138,-,-,08/Aug/1995:22:25:28 -0400,GET,/shuttle/missions/sts-69/mission-sts-69.html,HTTP/1.0,200,11264)
  • SAMPLE: The SAMPLE operator is useful when you want to work on a very small subset of data to quickly test if the data flow processing is giving you correct results. This statement provides a random data sample picked from the entire population using an arbitrary sample size. The sample size is passed as a parameter. As the SAMPLE operator internally uses a probability-based algorithm, it is not guaranteed to return the same number of rows or tuples every time SAMPLE is used.

    In our example, the SAMPLE operator returns, at most, 1 percent of the data as an illustration.

    sample_combined_raw_logs = SAMPLE combined_raw_logs 0.01;

    The content of tuple sample_combined_raw_logs is as follows:

    (163.205.2.43,-,-,17/Jul/1995:13:30:34 -0400,GET,/ksc.html,HTTP/1.0,200,7071)
    (204.97.74.34,-,-,27/Aug/1995:12:07:37 -0400,GET,/shuttle/missions/sts-69/liftoff.html,HTTP/1.0,304,0)
    (128.217.61.98,-,-,21/Aug/1995:08:59:26 -0400,GET,/images/ksclogo-medium.gif,HTTP/1.0,200,5866)
  • GROUP: The GROUP operator is used to group all records with the same value into a bag. This operator creates a nested structure of output tuples.

    The following snippet of code from our example illustrates grouping logs by day of the month.

    jgrpd = GROUP raw_logs_Jul BY DayMonExtractor(jdt);
    DESCRIBE jgrpd;

    Schema content of jgrpd: The following output shows the schema of the relation jgrpd where we can see that it has created a nested structure with two fields, the key and the bag of collected records. The key is named group, and value is the name of the alias that was grouped with raw_logs_Jul and raw_logs_Aug, in this case.

    jgrpd: {group: chararray,raw_logs_Jul: {(jaddr: bytearray,jlogname: bytearray,juser: bytearray,jdt: bytearray,jmethod: bytearray,juri: bytearray,jproto: bytearray,jstatus: bytearray,jbytes: bytearray)}}
    
    agrpd = GROUP raw_logs_Aug BY DayExtractor(adt);
    DESCRIBE agrpd;
    
    agrpd: {group: chararray,raw_logs_Aug: {(aaddr: bytearray,alogname: bytearray,auser: bytearray,adt: bytearray,amethod: bytearray,auri: bytearray,aproto: bytearray,astatus: bytearray,abytes: bytearray)}}
  • FOREACH: The FOREACH operator is also known as a projection. It applies a set of expressions to each record in the bag, similar to applying an expression on every row of a table. The result of this operator is another relation.

    In our example, FOREACH is used for iterating through each grouped record in the group to get the count of distinct IP addresses.

    jcountd = FOREACH jgrpd
    {
      juserIP =  raw_logs_Jul.jaddr;
      juniqIPs = DISTINCT juserIP;
      GENERATE FLATTEN(group) AS jdate,COUNT(juniqIPs) AS }
    
    acountd = FOREACH agrpd
    {
      auserIP =  raw_logs_Aug.aaddr;
      auniqIPs = DISTINCT auserIP;
      GENERATE FLATTEN(group) AS adate,COUNT(auniqIPs) AS acount;
    }

    Contents of the tuples: The following output shows the tuples in the relations jcountd and acountd. The first field is the date in the format of DD-MMM and the second field is the count of distinct hits.

     jcountd
    (01-Jul,4230)
    (02-Jul,4774)
    (03-Jul,7264)
    
     acountd
    (01-Aug,2577)
    (03-Aug,2588)
    (04-Aug,4254)
  • DISTINCT: The DISTINCT operator removes duplicate records in a relation. DISTINCT should not be used where you need to preserve the order of the contents.

    The following example code demonstrates the usage of DISTINCT to remove duplicate IP addresses and FLATTEN to remove the nest of jgrpd and agrpd.

    jcountd = FOREACH jgrpd
    {
      juserIP =  raw_logs_Jul.jaddr;
      juniqIPs = DISTINCT juserIP;
      GENERATE FLATTEN(group) AS jdate,COUNT(juniqIPs) AS jcount;
    }
    acountd = FOREACH agrpd
    {
      auserIP =  raw_logs_Aug.aaddr;
      auniqIPs = DISTINCT auserIP;
      GENERATE FLATTEN(group) AS adate,COUNT(auniqIPs) AS acount;
    }
    DESCRIBE jcountd;
    DESCRIBE acountd;

    Content of the tuples: The following output shows the schema of the relation of jcountd and acountd. We can see that the nesting created by GROUP is now removed.

    jcountd: {jdate: chararray,jcount: long}
    acountd: {adate: chararray,acount: long}
  • JOIN: The JOIN operator joins more than one relation based on shared keys.

    In our example, we join two relations by day of the month; it returns all the records where the day of the month matches. Records for which no match is found are dropped.

    joind = JOIN jcountd BY jdate, acountd BY adate;

    Content of tuples: The following output shows the resulting values after JOIN is performed. This relation returns all the records where the day of the month matches; records for which no match is found are dropped. For example, we have seen in sample output of FOREACH, the section jcountd shows 4774 hits on 2-Jul and acountd does not have any record for 2-Aug. Hence after JOIN, the tuple having 2-Jul hits is omitted as there is no match found for 2-Aug.

    (01-Jul,4230,01-Aug,2577)
    (03-Jul,7264,03-Aug,2588)
    (04-Jul,5806,04-Aug,4254)
    (05-Jul,7144,05-Aug,2566))
  • DESCRIBE: The DESCRIBE operator is a diagnostic operator in Pig and is used to view and understand the schema of an alias or a relation. This is a kind of command line log, which enables us to understand how preceding operators in the data pipeline are changing the data. The output of the DESCRIBE operator is the description of the schema.

    In our example, we use DESCRIBE to understand the schema.

    DESCRIBE joind;

    The output is as follows:

    joind: {jcountd::jdate: chararray,jcountd::jcount: long,acountd::adate: chararray,acountd::acount: long}
  • FILTER: The FILTER operator allows you to select or filter out the records from a relation based on a condition. This operator works on tuples or rows of data.

    The following example filters records whose count is greater than 2,600:

    filterd = FILTER joind BY jcount > 2600 and acount > 2600;

    Content of filtered tuple: All the records which are less than 2600 are filtered out.

    (04-Jul,5806,04-Aug,4254)
    (07-Jul,6951,07-Aug,4062)
    (08-Jul,3064,08-Aug,4252)
  • ILLUSTRATE: The ILLUSTRATE operator is the debugger's best friend, and it is used to understand how data passes through the Pig Latin statements and gets transformed. This operator enables us to create good test data in order to test our programs on datasets, which are a sample representing the flow of statements.

    ILLUSTRATE internally uses an algorithm, which uses a small sample of the entire input data and propagates this data through all the statements in the Pig Latin scripts. This algorithm intelligently generates sample data when it encounters operators such as FILTER, which have the ability to remove the rows from the data, resulting in no data following through the Pig statements.

    In our example, the ILLUSTRATE operator is used as shown in the following code snippet:

    filterd = FILTER joind BY jcount > 2600 and acount > 2600;
    ILLUSTRATE filterd;

    The dataset used by us does not have records where the count is less than 2,600. ILLUSTRATE has manufactured a record with two counts to ensure that values below 2,600 get filtered out. This record passes through the FILTER condition and gets filtered out and hence, no values are shown in the relation filtered.

    The following screenshot shows the output:

    Operators used in code

    Output of illustrate

  • ORDER BY: The ORDERBY operator is used to sort a relation using the sort key specified. As of today, Pig supports sorting on fields with simple types rather than complex types or expressions. In the following example, we are sorting based on two fields (July date and August date).
    srtd = ORDER filterd BY jdate,adate PARALLEL 5;
  • PARALLEL: The PARALLEL operator controls reduce-side parallelism by specifying the number of reducers. It is defaulted to one while running in a local mode. This clause can be used with operators, such as ORDER, DISTINCT, LIMIT, JOIN, GROUP, COGROUP, and CROSS that force a reduce phase.
  • LIMIT: The LIMIT operator is used to set an upper limit on the number of output records generated. The output is determined randomly and there is no guarantee if the output will be the same if the LIMIT operator is executed consequently. To request a particular group of rows, you may consider using the ORDER operator, immediately followed by the LIMIT operator.

    In our example, this operator returns five records as an illustration:

    limitd = LIMIT srtd 5;

    The content of the limitd tuple is given as follows:

    (04-Jul,5806,04-Aug,4254)
    (07-Jul,6951,07-Aug,4062)
    (08-Jul,3064,08-Aug,4252)
    (10-Jul,4383,10-Aug,4423)
    (11-Jul,4819,11-Aug,4500)
  • FLATTEN: The FLATTEN operator is used to make relations such as bags and tuple flat by removing the nesting in them. Please refer to the example code in DISTINCT for the sample output and usage of FLATTEN.

The EXPLAIN operator

A Pig program goes through multiple stages as shown in the next diagram, before being executed in the Hadoop cluster, and the EXPLAIN operator provides the best way to understand what transpires underneath the Pig Latin code. The EXPLAIN operator generates the sequence of plans that go into converting the Pig Latin scripts to a MapReduce JAR.

The output of this operator can be converted into a graphical format by the use of the -dot option to generate graphs of the program. This writes the output to a DOT file containing diagrams explaining the execution of our script.

The syntax for the same is as follows:

pig -x mapreduce -e 'pigexample.pig' -dot -out <filename> or <directoryname>

Next is an example of usage. If we specify a filename directory after -out, all the three output files (logical, physical, and MapReduce plan files) will get created in that directory. In the next case, all files will get created in the pigexample_output directory.

pig -x mapreduce -e ' pigexample.pig' -dot -out pigexample_output

Follow the given steps to convert the DOT files into an image format:

  1. Install graphviz on your machine.
  2. Plot a graph written in Dot language by executing the following command:
    dot –Tpng filename.dot  >  filename.png

The following diagram shows each step in Pig processing:

The EXPLAIN operator

Pig Latin to Hadoop JAR

  • The query parser: The parser uses ANother Tool for Language Recognition (ANTLR), a language parser, to verify whether the program is correct syntactically and if all the variables are properly defined. The parser also checks the schemas for type correctness and generates intermediate representation, Abstract Syntax Tree (AST).
  • The logical plan: The intermediate representation, AST, is transformed into a logical plan. This plan is implemented internally as a directed graph with all the operators in the Pig Latin script mapped to the logical operators. The following diagram illustrates this plan:
    The EXPLAIN operator

    Logical plan

  • The logical optimization: The logical plan generated is examined for opportunities of optimization such as filter-projection pushdown and column pruning. These are considered depending on the script. Optimization is performed and then the plan is compiled into a series of physical plans.
  • The physical plan: The physical plan is a physical description of the computation that creates a usable pipeline, which is independent of MapReduce. We could use this pipeline and target other processing frameworks such as Dryad. The opportunities for optimizations in this stage are in memory aggregations instead of using combiners. The physical planning stage is also the right place where the plan is examined for the purpose of reducing the number of reducers.

    For clarity, each logical operator is shown with an ID. Physical operators that are produced by the translation of a logical operator are shown with the same ID. For the most part, each logical operator becomes a corresponding physical operator. The logical GROUP operator maps into a series of physical operators: local and global rearrange plus package. Rearrange is just like the Partitioner class and Reducer step of the MapReduce where sorting by a key happens.

    The following diagram shows the logical plan translated to a physical plan example:

    The EXPLAIN operator

    Logical to physical plan

  • MapReduce plan: This is the phase where the physical plan is converted into a graph of actual MapReduce jobs with all the inputs and outputs specified. Opportunities for optimization in the MapReduce phase are examined to see if it is possible to combine multiple MapReduce jobs into one job for reducing the data flow between Mappers and Reducers. The idea of decoupling the Logical and Physical plans from the MapReduce plan is to divorce them from the details of running on Hadoop. This level of abstraction is necessary to port the application to a different processing framework like Dryad.

    The following Physical to MapReduce Plan shows the assignment of the physical operators to Hadoop stages for our running example (only the map and reduce stages are shown). In the MapReduce plan, the local rearrange operator interprets tuples with keys and input stream's identifiers.

    The EXPLAIN operator

    Physical to the MapReduce plan

Understanding Pig's data model

The Pig's data model consists of both primitive and complex types. The following sections give a brief overview of these data types:

Primitive types

Pig supports primitive data types such as Int, Float, Long, Double, and Chararray.

Complex types

The following are the complex data types, formed by combining the primitive data types:

  • Atom: An atom contains a single value and can be composed of any one of the primitive data types.
  • Tuple: A tuple is like a row of a table in an RDBMS, or a sequence of fields, each of which can be any of the data types including primitive and complex types. Tuples are enclosed in parentheses, (). An example is shown as follows:
     (163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585)
  • Bag: A bag is analogous to a table and a collection of tuples, which may have duplicates too. Bag is Pig's only spillable data structure, which implies that when the full structure does not fit in memory, it is spilled on to the disk and paged in when necessary. In a Bag, the schema of the constituent tuples is flexible and doesn't need to have a consistent number and type of fields. Bags are represented by data or tuple in curly braces, {}. An example is shown as follows:
    {(163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585)
    (100.305.185.30,-,-,13/AUG/1995:09:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585)}
  • Map: This is a key value data structure. The schema of the data items in a Map is not strictly enforced, giving the option to take the form of any type. Map is useful to prototype datasets where schemas may change over time. Maps are enclosed in square braces, [].

The relevance of schemas

Pig has a special way to handle known and unknown schemas. Schemas exist to give fields their identity by naming them and categorizing them into a data type. Pig has the ability to discover schemas at runtime by making appropriate assumptions about the data types. In case the data type is not assigned, Pig defaults the type to bytearray and performs conversions later, based on the context in which that data is used. This feature gives Pig an edge when you want to use it for research purposes to create quick prototypes on data with the unknown schema. Notwithstanding these advantages of working with unspecified schemas, it is recommended to specify the schema wherever or whenever it is possible for more efficient parse-time checking and execution. However, there are a few idiosyncrasies of how Pig handles unknown schemas when using various operators.

For the relational operators that perform JOIN, GROUP, UNION, or CROSS, if any one of the operators in the relation doesn't have a schema specified, then the resultant relation would be null. Similarly, a null would be the result when you try to flatten a bag with unknown schema.

Extending the discussion of how nulls can be resulted in Pig as in the preceding section, there are a few other ways nulls could result through the interaction of specific operators. As a quick illustration, if any of the subexpression operand in the comparison operators, such as ==, <, >, and MATCHES are null, then the result would be null. The same is applicable to arithmetic operators (such as +, -, *, /) and the CONCAT operators too. It is important to remember subtle differences between how various functions respect a null. While the AVG, MAX, MIN, SUM, and COUNT functions disregard nulls, the COUNT_STAR function does not ignore it and counts a null as if there is a value to it.