- Pig Design Patterns
- Pradeep Pasupuleti
- 3445字
- 2021-07-16 12:07:59
Understanding Pig through the code
The following subsections have a brief description of the operators and their usage:
Pig's extensibility
In the use case example, the REGISTER
function is one of the three ways to incorporate external custom code in Pig scripts. Let's quickly examine the other two Pig extensibility features in this section to get a better understanding.
REGISTER
: The UDFs provide one avenue to include the user code. To use the UDF written in Java, Python, JRuby, or Groovy, we use theREGISTER
function in the Pig script to register the container (JAR and Python script). To register a Python UDF, you also need to explicitly provide which compiler the Python script will be using. This can be done using Jython.In our example, the following line registers the Piggybank JAR:
REGISTER '/opt/cloudera/parcels/CDH-4.3.0-1.cdh4.3.0.p0.22/lib/pig/piggybank.jar';
MAPREDUCE
: This operator is used to embed MapReduce jobs in Pig scripts. We need to specify the MapReduce container JAR along with the inputs and outputs for the MapReduce program.An example is given as follows:
input = LOAD 'input.txt'; result = MAPREDUCE 'mapreduceprg.jar' [('other.jar', ...)] STORE input INTO 'inputPath' USING storeFunc LOAD 'outputPath' USING loadFunc AS schema ['params, ... '];
The previous statement stores the relation named
input
intoinputPath
usingstoreFunc
; nativemapreduce
usesstoreFunc
to read the data. The data received as a result of executingmapreduceprg.jar
is loaded fromoutputPath
into the relation named result usingloadFunc
as schema.STREAM
: This allows data to be sent to an external executable for processing as part of a Pig data processing pipeline. You can intermix relational operations, such as grouping and filtering with custom or legacy executables. This is especially useful in cases where the executable has all the custom code, and you may not want to change the code and rewrite it in Pig. The external executable receives its input from a standard input or file, and writes its output either to a standard output or file.The syntax for the operator is given as follows:
alias = STREAM alias [, alias …] THROUGH {'command' | cmd_alias } [AS schema] ;
Where
alias
is the name of the relation,THROUGH
is the keyword,command
is the executable along with arguments,cmd_alias
is the alias defined for the command using theDEFINE
operator,AS
is a keyword, andschema
specifies the schema.
Operators used in code
The following is an explanation of the operators used in the code:
DEFINE
: TheDEFINE
statement is used to assign an alias to an external executable or a UDF function. Use this statement if you want to have a crisp name for a function that has a lengthy package name.For a
STREAM
command,DEFINE
plays an important role to transfer the executable to the task nodes of the Hadoop cluster. This is accomplished using theSHIP
clause of theDEFINE
operator. This is not a part of our example and will be illustrated in later chapters.In our example, we define aliases by names
ApacheCommonLogLoader
,DayMonExtractor
, andDayExtractor
for the corresponding fully qualified class names.DEFINE ApacheCommonLogLoader org.apache.pig.piggybank.storage.apachelog.CommonLogLoader(); DEFINE DayMonExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('dd/MMM/yyyy:HH:mm:ss Z','dd-MMM'); DEFINE DayExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('dd-MMM','dd');
LOAD
: This operator loads data from the file or directory. If a directory name is specified, it loads all the files in the directory into the relation. If Pig is run in the local mode, it searches for the directories on the local File System; while in the MapReduce mode, it searches for the files on HDFS. In our example, the usage is as follows:raw_logs_Jul = LOAD 'NASA_access_logs/Jul/access_log_Jul95' USING ApacheCommonLogLoader AS (jaddr, jlogname, juser, jdt, jmethod, juri, jproto, jstatus, jbytes); raw_logs_Aug = LOAD 'NASA_access_logs/Aug/access_log_Aug95' USING ApacheCommonLogLoader AS (aaddr, alogname, auser, adt, amethod, auri, aproto, astatus, abytes);
The content of
tuple raw_logs_Jul
is as follows:(163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585) (163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/cgi-bin/imagemap/countdown70?287,288,HTTP/1.0,302,85) (109.172.181.143,-,-,02/Jul/1995:00:12:01 -0400,GET,/history/xxx/,HTTP/1.0,200,6245)
By using globs (such as
*.txt
,*.csv
, and so on), you can read multiple files (all the files or selective files) that are in the same directory. In the following example, the files under the foldersJul
andAug
will be loaded as a union.raw_logs = LOAD 'NASA_access_logs/{Jul,Aug}' USING ApacheCommonLogLoader AS (addr, logname, user, dt, method, uri, proto, status, bytes);
STORE
: TheSTORE
operator has dual purposes, one is to write the results into the File System after completion of the data pipeline processing, and another is to actually commence the execution of the preceding Pig Latin statements. This happens to be an important feature of this language, where logical, physical, and MapReduce plans are created after the script encounters theSTORE
operator.In our example, the following code demonstrates their usage:
DUMP limitd; STORE limitd INTO 'unique_hits_by_month';
DUMP
: TheDUMP
operator is almost similar to theSTORE
operator, but it is used specially to display results on the command prompt rather than storing it in a File System like theSTORE
operator.DUMP
behaves in exactly the same way asSTORE
, where the Pig Latin statements actually begin execution after encountering theDUMP
operator. This operator is specifically targeted for the interactive execution of statements and viewing the output in real time.In our example, the following code demonstrates the usage of the
DUMP
operator:DUMP limitd;
UNION
: TheUNION
operator merges the contents of more than one relation without preserving the order of tuples as the relations involved are treated as unordered bags.In our example, we will use
UNION
to merge the two relationsraw_logs_Jul
andraw_logs_Aug
into a relation calledcombined_raw_logs
.combined_raw_logs = UNION raw_logs_Jul, raw_logs_Aug;
The content of
tuple combined_raw_logs
is as follows:(163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585) (163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/cgi-bin/imagemap/countdown70?287,288,HTTP/1.0,302,85) (198.4.83.138,-,-,08/Aug/1995:22:25:28 -0400,GET,/shuttle/missions/sts-69/mission-sts-69.html,HTTP/1.0,200,11264)
SAMPLE
: TheSAMPLE
operator is useful when you want to work on a very small subset of data to quickly test if the data flow processing is giving you correct results. This statement provides a random data sample picked from the entire population using an arbitrary sample size. The sample size is passed as a parameter. As theSAMPLE
operator internally uses a probability-based algorithm, it is not guaranteed to return the same number of rows or tuples every timeSAMPLE
is used.In our example, the
SAMPLE
operator returns, at most, 1 percent of the data as an illustration.sample_combined_raw_logs = SAMPLE combined_raw_logs 0.01;
The content of
tuple sample_combined_raw_logs
is as follows:(163.205.2.43,-,-,17/Jul/1995:13:30:34 -0400,GET,/ksc.html,HTTP/1.0,200,7071) (204.97.74.34,-,-,27/Aug/1995:12:07:37 -0400,GET,/shuttle/missions/sts-69/liftoff.html,HTTP/1.0,304,0) (128.217.61.98,-,-,21/Aug/1995:08:59:26 -0400,GET,/images/ksclogo-medium.gif,HTTP/1.0,200,5866)
GROUP
: TheGROUP
operator is used to group all records with the same value into a bag. This operator creates a nested structure of output tuples.The following snippet of code from our example illustrates grouping logs by day of the month.
jgrpd = GROUP raw_logs_Jul BY DayMonExtractor(jdt); DESCRIBE jgrpd;
Schema content of jgrpd: The following output shows the schema of the relation
jgrpd
where we can see that it has created a nested structure with two fields, the key and the bag of collected records. The key is namedgroup
, andvalue
is the name of the alias that was grouped withraw_logs_Jul
andraw_logs_Aug
, in this case.jgrpd: {group: chararray,raw_logs_Jul: {(jaddr: bytearray,jlogname: bytearray,juser: bytearray,jdt: bytearray,jmethod: bytearray,juri: bytearray,jproto: bytearray,jstatus: bytearray,jbytes: bytearray)}} agrpd = GROUP raw_logs_Aug BY DayExtractor(adt); DESCRIBE agrpd; agrpd: {group: chararray,raw_logs_Aug: {(aaddr: bytearray,alogname: bytearray,auser: bytearray,adt: bytearray,amethod: bytearray,auri: bytearray,aproto: bytearray,astatus: bytearray,abytes: bytearray)}}
FOREACH
: TheFOREACH
operator is also known as a projection. It applies a set of expressions to each record in the bag, similar to applying an expression on every row of a table. The result of this operator is another relation.In our example,
FOREACH
is used for iterating through each grouped record in the group to get the count of distinct IP addresses.jcountd = FOREACH jgrpd { juserIP = raw_logs_Jul.jaddr; juniqIPs = DISTINCT juserIP; GENERATE FLATTEN(group) AS jdate,COUNT(juniqIPs) AS } acountd = FOREACH agrpd { auserIP = raw_logs_Aug.aaddr; auniqIPs = DISTINCT auserIP; GENERATE FLATTEN(group) AS adate,COUNT(auniqIPs) AS acount; }
Contents of the tuples: The following output shows the tuples in the relations
jcountd
andacountd
. The first field is the date in the format of DD-MMM and the second field is the count of distinct hits.jcountd (01-Jul,4230) (02-Jul,4774) (03-Jul,7264) acountd (01-Aug,2577) (03-Aug,2588) (04-Aug,4254)
DISTINCT
: TheDISTINCT
operator removes duplicate records in a relation.DISTINCT
should not be used where you need to preserve the order of the contents.The following example code demonstrates the usage of
DISTINCT
to remove duplicate IP addresses andFLATTEN
to remove the nest ofjgrpd
andagrpd
.jcountd = FOREACH jgrpd { juserIP = raw_logs_Jul.jaddr; juniqIPs = DISTINCT juserIP; GENERATE FLATTEN(group) AS jdate,COUNT(juniqIPs) AS jcount; } acountd = FOREACH agrpd { auserIP = raw_logs_Aug.aaddr; auniqIPs = DISTINCT auserIP; GENERATE FLATTEN(group) AS adate,COUNT(auniqIPs) AS acount; } DESCRIBE jcountd; DESCRIBE acountd;
Content of the tuples: The following output shows the schema of the relation of
jcountd
andacountd
. We can see that the nesting created byGROUP
is now removed.jcountd: {jdate: chararray,jcount: long} acountd: {adate: chararray,acount: long}
JOIN
: TheJOIN
operator joins more than one relation based on shared keys.In our example, we join two relations by day of the month; it returns all the records where the day of the month matches. Records for which no match is found are dropped.
joind = JOIN jcountd BY jdate, acountd BY adate;
Content of tuples: The following output shows the resulting values after
JOIN
is performed. This relation returns all the records where the day of the month matches; records for which no match is found are dropped. For example, we have seen in sample output ofFOREACH
, the sectionjcountd
shows 4774 hits on2-Jul
andacountd
does not have any record for2-Aug
. Hence afterJOIN
, the tuple having2-Jul
hits is omitted as there is no match found for2-Aug
.(01-Jul,4230,01-Aug,2577) (03-Jul,7264,03-Aug,2588) (04-Jul,5806,04-Aug,4254) (05-Jul,7144,05-Aug,2566))
DESCRIBE
: TheDESCRIBE
operator is a diagnostic operator in Pig and is used to view and understand the schema of an alias or a relation. This is a kind of command line log, which enables us to understand how preceding operators in the data pipeline are changing the data. The output of theDESCRIBE
operator is the description of the schema.In our example, we use
DESCRIBE
to understand the schema.DESCRIBE joind;
The output is as follows:
joind: {jcountd::jdate: chararray,jcountd::jcount: long,acountd::adate: chararray,acountd::acount: long}
FILTER
: TheFILTER
operator allows you to select or filter out the records from a relation based on a condition. This operator works on tuples or rows of data.The following example filters records whose count is greater than 2,600:
filterd = FILTER joind BY jcount > 2600 and acount > 2600;
Content of filtered tuple: All the records which are less than 2600 are filtered out.
(04-Jul,5806,04-Aug,4254) (07-Jul,6951,07-Aug,4062) (08-Jul,3064,08-Aug,4252)
ILLUSTRATE
: TheILLUSTRATE
operator is the debugger's best friend, and it is used to understand how data passes through the Pig Latin statements and gets transformed. This operator enables us to create good test data in order to test our programs on datasets, which are a sample representing the flow of statements.ILLUSTRATE
internally uses an algorithm, which uses a small sample of the entire input data and propagates this data through all the statements in the Pig Latin scripts. This algorithm intelligently generates sample data when it encounters operators such asFILTER
, which have the ability to remove the rows from the data, resulting in no data following through the Pig statements.In our example, the
ILLUSTRATE
operator is used as shown in the following code snippet:filterd = FILTER joind BY jcount > 2600 and acount > 2600; ILLUSTRATE filterd;
The dataset used by us does not have records where the count is less than 2,600.
ILLUSTRATE
has manufactured a record with two counts to ensure that values below 2,600 get filtered out. This record passes through theFILTER
condition and gets filtered out and hence, no values are shown in the relation filtered.The following screenshot shows the output:
Output of illustrate
ORDER BY
: TheORDERBY
operator is used to sort a relation using the sort key specified. As of today, Pig supports sorting on fields with simple types rather than complex types or expressions. In the following example, we are sorting based on two fields (July date and August date).srtd = ORDER filterd BY jdate,adate PARALLEL 5;
PARALLEL
: ThePARALLEL
operator controls reduce-side parallelism by specifying the number of reducers. It is defaulted to one while running in a local mode. This clause can be used with operators, such asORDER
,DISTINCT
,LIMIT
,JOIN
,GROUP
,COGROUP
, andCROSS
that force a reduce phase.LIMIT
: TheLIMIT
operator is used to set an upper limit on the number of output records generated. The output is determined randomly and there is no guarantee if the output will be the same if theLIMIT
operator is executed consequently. To request a particular group of rows, you may consider using theORDER
operator, immediately followed by theLIMIT
operator.In our example, this operator returns five records as an illustration:
limitd = LIMIT srtd 5;
The content of the
limitd
tuple is given as follows:(04-Jul,5806,04-Aug,4254) (07-Jul,6951,07-Aug,4062) (08-Jul,3064,08-Aug,4252) (10-Jul,4383,10-Aug,4423) (11-Jul,4819,11-Aug,4500)
FLATTEN
: TheFLATTEN
operator is used to make relations such as bags and tuple flat by removing the nesting in them. Please refer to the example code in DISTINCT for the sample output and usage ofFLATTEN
.
The EXPLAIN operator
A Pig program goes through multiple stages as shown in the next diagram, before being executed in the Hadoop cluster, and the EXPLAIN
operator provides the best way to understand what transpires underneath the Pig Latin code. The EXPLAIN
operator generates the sequence of plans that go into converting the Pig Latin scripts to a MapReduce JAR.
The output of this operator can be converted into a graphical format by the use of the -dot
option to generate graphs of the program. This writes the output to a DOT file containing diagrams explaining the execution of our script.
The syntax for the same is as follows:
pig -x mapreduce -e 'pigexample.pig' -dot -out <filename> or <directoryname>
Next is an example of usage. If we specify a filename directory after -out
, all the three output files (logical, physical, and MapReduce plan files) will get created in that directory. In the next case, all files will get created in the pigexample_output
directory.
pig -x mapreduce -e ' pigexample.pig' -dot -out pigexample_output
Follow the given steps to convert the DOT files into an image format:
- Install graphviz on your machine.
- Plot a graph written in Dot language by executing the following command:
dot –Tpng filename.dot > filename.png
The following diagram shows each step in Pig processing:
![The EXPLAIN operator](https://epubservercos.yuewen.com/DA11CF/19470452601872806/epubprivate/OEBPS/Images/5556OS_01_06.jpg?sign=1739692383-NAmpl17pcNybJTIQQHkNCGcbhrVaqoam-0-585137f1b364e30cdf309df0028354fa)
Pig Latin to Hadoop JAR
- The query parser: The parser uses ANother Tool for Language Recognition (ANTLR), a language parser, to verify whether the program is correct syntactically and if all the variables are properly defined. The parser also checks the schemas for type correctness and generates intermediate representation, Abstract Syntax Tree (AST).
- The logical plan: The intermediate representation, AST, is transformed into a logical plan. This plan is implemented internally as a directed graph with all the operators in the Pig Latin script mapped to the logical operators. The following diagram illustrates this plan:
Logical plan
- The logical optimization: The logical plan generated is examined for opportunities of optimization such as filter-projection pushdown and column pruning. These are considered depending on the script. Optimization is performed and then the plan is compiled into a series of physical plans.
- The physical plan: The physical plan is a physical description of the computation that creates a usable pipeline, which is independent of MapReduce. We could use this pipeline and target other processing frameworks such as Dryad. The opportunities for optimizations in this stage are in memory aggregations instead of using combiners. The physical planning stage is also the right place where the plan is examined for the purpose of reducing the number of reducers.
For clarity, each logical operator is shown with an ID. Physical operators that are produced by the translation of a logical operator are shown with the same ID. For the most part, each logical operator becomes a corresponding physical operator. The logical
GROUP
operator maps into a series of physical operators: local and global rearrange plus package. Rearrange is just like thePartitioner
class andReducer
step of the MapReduce where sorting by a key happens.The following diagram shows the logical plan translated to a physical plan example:
Logical to physical plan
- MapReduce plan: This is the phase where the physical plan is converted into a graph of actual MapReduce jobs with all the inputs and outputs specified. Opportunities for optimization in the MapReduce phase are examined to see if it is possible to combine multiple MapReduce jobs into one job for reducing the data flow between Mappers and Reducers. The idea of decoupling the Logical and Physical plans from the MapReduce plan is to divorce them from the details of running on Hadoop. This level of abstraction is necessary to port the application to a different processing framework like Dryad.
The following Physical to MapReduce Plan shows the assignment of the physical operators to Hadoop stages for our running example (only the map and reduce stages are shown). In the MapReduce plan, the local rearrange operator interprets tuples with keys and input stream's identifiers.
Physical to the MapReduce plan
Understanding Pig's data model
The Pig's data model consists of both primitive and complex types. The following sections give a brief overview of these data types:
The following are the complex data types, formed by combining the primitive data types:
- Atom: An atom contains a single value and can be composed of any one of the primitive data types.
- Tuple: A tuple is like a row of a table in an RDBMS, or a sequence of fields, each of which can be any of the data types including primitive and complex types. Tuples are enclosed in parentheses,
()
. An example is shown as follows:(163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585)
- Bag: A bag is analogous to a table and a collection of tuples, which may have duplicates too. Bag is Pig's only spillable data structure, which implies that when the full structure does not fit in memory, it is spilled on to the disk and paged in when necessary. In a Bag, the schema of the constituent tuples is flexible and doesn't need to have a consistent number and type of fields. Bags are represented by data or tuple in curly braces,
{}
. An example is shown as follows:{(163.205.85.3,-,-,13/Jul/1995:08:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585) (100.305.185.30,-,-,13/AUG/1995:09:51:12 -0400,GET,/htbin/cdt_main.pl,HTTP/1.0,200,3585)}
- Map: This is a key value data structure. The schema of the data items in a Map is not strictly enforced, giving the option to take the form of any type. Map is useful to prototype datasets where schemas may change over time. Maps are enclosed in square braces,
[]
.
Pig has a special way to handle known and unknown schemas. Schemas exist to give fields their identity by naming them and categorizing them into a data type. Pig has the ability to discover schemas at runtime by making appropriate assumptions about the data types. In case the data type is not assigned, Pig defaults the type to bytearray and performs conversions later, based on the context in which that data is used. This feature gives Pig an edge when you want to use it for research purposes to create quick prototypes on data with the unknown schema. Notwithstanding these advantages of working with unspecified schemas, it is recommended to specify the schema wherever or whenever it is possible for more efficient parse-time checking and execution. However, there are a few idiosyncrasies of how Pig handles unknown schemas when using various operators.
For the relational operators that perform JOIN
, GROUP
, UNION
, or CROSS
, if any one of the operators in the relation doesn't have a schema specified, then the resultant relation would be null. Similarly, a null would be the result when you try to flatten a bag with unknown schema.
Extending the discussion of how nulls can be resulted in Pig as in the preceding section, there are a few other ways nulls could result through the interaction of specific operators. As a quick illustration, if any of the subexpression operand in the comparison operators, such as ==
, <
, >
, and MATCHES
are null, then the result would be null. The same is applicable to arithmetic operators (such as +
, -, *
, /
) and the CONCAT
operators too. It is important to remember subtle differences between how various functions respect a null. While the AVG
, MAX
, MIN
, SUM
, and COUNT
functions disregard nulls, the COUNT_STAR
function does not ignore it and counts a null as if there is a value to it.