Saturday, September 8, 2012

Pignalytics - Pigs Eat Anything : Reading and Writing Text Data in Pig


In this second post on Pig, I would like to cover how Pig can read and write data in a variety of formats. However before I begin, let me go over some basics. In Pig, the smallest unit of data is an atomic value which can be of type integer (signed 32-bit integer), long (signed 64-bit integer), float (32-bit floating point), double (64-bit floating point), chararray (UTF-8 string or character array) or bytearray (byte blob).

Pig can also understand and handle complex data types- tuples, bags and maps. A tuple data type is an ordered collection of fields. A field can be atomic data type or one of the complex data types. Each field in a tuple is identified by its position or identifier name that may be associated with it as part of a schema definition. A bag is an unordered collection of tuples - wherein each tuple may consists of one or more atomic or complex fields. And finally, Pig supports a map data type which consists of a set of key-value pairs. The key has to be be of type chararray while the value can be any data type. Note that there can be arbitrary levels of nesting within a complex data type. Pig works over data sets referred to as relations and the unit of processing data within a relation is a tuple.

Here's the schema definition for an employee relation that contains 5 fields:

employee = tuple(employee_id:int, 
                 email:chararray, 
                 name:tuple(first_name:chararray, middle_name:chararray, last_name:chararray), 
                 project_list:bag{project(project_name:chararray)},
                 skills:map[skill_details:chararray])

Let us use the above schema definition to read data in plain-text and XML formats. We will deal with HBase and Hive input formats in the next blog article. Note that the schema definition is a combination of a JSON like representation of a record using Pig data types and Pig notations for the data types. To be successful with Pig, it is essential that you understand the complex (or compound?) Pig data types tuples, bags and maps - essentially their representation. It took me a few weeks at work of stumbling over and troubleshooting unexpected/wierd behavior to get comfortable. 

Unless specified otherwise, all my examples below are using pig executing in the local mode and using Unix filesystem files.

Reading Plain-text File

Below is a sample text file containing data confirming to the above format and a sample Pig script to read dump some of the fields.

1234|emp_1234@company.com|(first_name_1234,middle_initial_1234,last_name_1234)|{(project_1234_1),(project_1234_2),(project_1234_3)}|[programming#SQL,rdbms#Oracle]
4567|emp_4567@company.com|(first_name_4567,middle_initial_4567,last_name_4567)|{(project_4567_1),(project_4567_2),(project_4567_3)}|[programming#Java,OS#Linux]


The short Pig program below reads the above text file and prints (dumps) a few fields in a few different ways. 

a = LOAD 'a.txt' using PigStorage('|') 
    AS (employee_id:int, email:chararray, name:tuple(first_name:chararray, middle_name:chararray, last_name:chararray), project_list:bag{project: tuple(project_name:chararray)}, skills:map[chararray]) ;
DESCRIBE a ;
DUMP a ;

b = FOREACH a GENERATE employee_id, email, name.first_name, project_list, skills#'programming' ;
DESCRIBE b ;
DUMP b ;

c = FOREACH a GENERATE employee_id, email, name.first_name, project_list.$0, skills#'programming' ;
DESCRIBE c ;
DUMP c ;

d = FOREACH a GENERATE employee_id, email, name.first_name, FLATTEN(project_list) AS project, skills#'programming' ;
DESCRIBE d ;
DUMP d ;


PigStorage is one way to read a text file. It takes a single argument which is designated as the field seperator. Note that the field seperator is searched for as-is, and any "escaped" occurrences of the field seperator in the field-values will be considered as a field seperator. PigStorage can read data from a single file, from files in a directory (either all or files matching a wild-card parameter) and the files can be plain-text or compressed (zip, gz or bz2) - didn't I say Pigs can eat anything ?!

Note that if you are using Pig release 9.x or below, there is a defect in Pig version 0.9.2 - PIG-2086 that requires you to have the complete "AS" clause on the same line.

Below are some variants to the above LOAD statement (clause).

This next line reads a single plain-text file that is comma seperated
a = LOAD 'a.txt' USING PigStorage(',')

The next line reads files from a directory.The files may be plain-text, compressed, etc, but PigStorage will examine the file extension and wherever applicable will use an appropriate compression library. Also, there will be one mapper per file.

a = LOAD '/dump/data/directory/' USING PigStorage(|) -- 

This next one reads all txt files (*.txt) from a directory. Again there will be one mapper per text file.
a = LOAD '/dump/data/directory/*.txt' USING PigStorage(|) 


The next two lines read data from compressed files.
a = LOAD '/dump/data/myfile.txt.gz' USING PigStorage(|)
a = STORE b INTO '/dump/data/myfile.txt.bz2' USING PigStorage(|)


The last statement might catch you by surprise. First, note that the output is a directory and not a single file. This is intrinsic to the map-reduce framework. Your pig statement is like a map program, and there could be one or more mappers for your program. To allow *all* of them to direct their output to a single location, the location has to be a directory. And if you want to "trigger" the use of compression, you need to have a ".gz" or ".bz2" suffix to trigger the usage of the corresponding compression library.

Now here's a big gotcha - if your file or some of the records are not in the correct format, the program will silently ignore the file and/or records without any grunts. So be careful and put in some checks - either in the data or in the program. E.g. you can filter out records where one or more of the mandatory fields are empty or malformed and save those to a seperate location.

However a big bonus with PigStorage is that the location/path of the file(s) to be loaded can be an HDFS path, say, on a remote Hadooop cluster (remote = different from the one where the Pig MapReduce program is running) or even Amazon S3. Using such an approach can save "distcp"ing files.


Reading XML Data

Besides plain-text, Pig can also read an XML file. Below is an XML file with the same data as the plain text file.

<employee_list>
  <employee>
    <employee_id>1234</employee_id>
    <email>emp_1234@company.com</email>
    <name>(first_name_1234,middle_initial_1234,last_name_1234)</name>
    <projects>{(project_1234_1),(project_1234_2),(project_1234_3)}</projects>
    <skills>[programming:SQL,rdbms:Oracle]</skills>
  </employee>
  <employee>
    <employee_id>4567</employee_id>
    <email>emp_4567@company.com</email>
    <name>(first_name_4567,middle_initial_4567,last_name_4567)</name>
    <projects>{(project_4567_1),(project_4567_2),(project_4567_3)}</projects>
    <skills>[programming:Java,OS:'Linux Unix MacOS']</skills>
  </employee>
</employee_list>

Incidentally, the XML loader/reader is not part of the core Pig but is contained in user-contributed code that is bundled with Pig. So we need to explicitly specify it as shown below. Here's a small pig program to read the above file and dump each employee record. Note that you need to replace the appropriate path for the piggybank jar below.

REGISTER /usr/lib/pig/contrib/piggybank/java/piggybank.jar ;

a = LOAD 'a.xml' USING XMLLoader('employee') ;
DESCRIBE a ;
DUMP a ;

XMLLoader takes a single optional argument (in single quotes) that specifies the element name that is the container for each record. Note that schema specification ends here. Pig only recognizes a schema specified at the record level - e.g. "employee" in the above case. To translate XML elements and attributes to Pig fields, you will have to do the parsing and extraction yourself using custom (user-defined) functions or regex expressions.

For example, the above program can be re-written as follows to parse the XML data -

REGISTER /usr/lib/pig/contrib/piggybank/java/piggybank.jar ;

a = LOAD 'a.xml' USING XMLLoader('employee') AS xml_record ;
DESCRIBE a ;
DUMP a ;

b = FOREACH a GENERATE REGEX_EXTRACT(xml_record, '(\\<employee_id\\>.+?\\<\\/employee_id\\>)', 1) AS
            (employee_id_xml_element) ;
DESCRIBE b ;
DUMP b ;


And There's More

Besides plain-text and XML, Pig can also read CSV, Json (Pig v0.10 and higher) and Avro. And ofcourse you can also write custom loaders to read data in other formats/sources - e.g. http, ftp, etc. I came across something interesting from HStreaming (www.hstreaming.com) - http://www.hstreaming.com/docs/getting-started/pigscript/. I have not tried the product - but it looks interesting!