Not reinventing the wheel II - Pig
This is the second of a series of posts on BigData software and architecture. The main goal of this series is to not reinvent the wheel by re-developing a system that is already out there. In this post I’ll take a look at Pig, a high level language that can be compiled into MapReduce jobs.
The original paper introducing Pig can be found here.
Here we can see Pig riding Hadoop MapReduce
In a recent post, I took a look at the MapReduce framework developed by Google back in 2003 (also available as an open-source project at Hadoop MapReduce). The MapReduce framework provides the seamless parallelization of large-scale data processing programs on computing clusters. The user specifies two functions, map and reduce, and these are automatically deployed in the cluster. This is a powerful abstraction that decreases the need for writing application-specific code that provides parallelization, load-balancing, fault-tolerance, synchronization, etc among workers.
However, the implementation interface provided by MapReduce can feel weird for programmers coming from imperative languages. Conceptually-easy computations such as the mean become unnatural when defined on MapReduce. As a toy example, we can imagine having a file “sequences.tsv” (it could be a really large file stored across several machines in HDFS) with a list of sequences that we want to calculate the mean over in Python using Hadoop’s implementation of MapReduce:
sequences.tsv
1 2 3 4 5 6 7 8 9 10 11 12
#mapper.py
def map_function(sequence):
return len(sequence), sum(sequence)
elements_in_doc = 0
doc_sum = 0
for line in sys.stdin:
elements_in_line, line_sum = map_function(line.split())
elements_in_doc += elements_in_line
doc_sum += line_sum
print(str(elements_in_doc) + "\t" + str(doc_sum))
#reducer.py
total_elements = 0
total_sum = 0
for doc in sys.stdin():
elements_in_doc, doc_sum = doc_results.split("\t")
total_elements += elements_in_doc
total_sum += doc_sum
print(total_sum / total_elements)
In this sense, MapReduce provides an excessively low-level interface for the programmer who would
like to compute the mean simply with sum(sequence) / len(sequence)
. This issue becomes more
obvious for more complex programs or for combining several input sources. This was noticed by
developers at Yahoo! who aimed to develop a higher-level language that would allow for more rapid
development by removing the explicit necessity of translating concepts to map-reduceable functions.
The developed language, called Pig Latin after the compiler’s name Pig, allowed for a more procedural definition of programs that later could be compiled to MapReduce jobs, thus preserving all the features from MapReduce while making programmers’ life easier. The same example program for calculating the mean can be specified in Pig Latin:
list_of_sequences = LOAD "sequences.tsv"
full_sequence = FLATTEN(list_of_sequences)
average = AVG(full_sequence)
As we can see with this toy example, PigLatin implementation is much easier to understand than the artificial-feeling one in MapReduce. With the excuse of this example, we can take a look at how Pig treats data structures. In contrast with the flat data model offered in databases, Pig offers a nested data model.
We can draw an easy comparison with Python: the base data type in Pig (atoms) are string values in Python, tuples in Pig are tuples in Python and lists in Python are known as bags in Pig. In the example above, list_of_sequences is a bag of tuples: { (1, 2, 3, 4, 5), (6, 7, 8, 9, 10), (11, 12) }, full_sequence is a flat version of this bag: { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 } and average is an atom with value 6.5 resulting from the high-level implementation of the mean function in Pig.
If we wanted to calculate independent means for each tuple in the bag we can do:
tuple_averages = FOREACH list_of_sequences GENERATE AVG(*)
where * refers to each tuple, in our case, the full sequence.
If the calculation was only necessary for sequences with a length longer than 3:
long_sequences = FILTER sequences BY SIZE(*) > 3
tuple_averages = FOREACH long_sequences GENERATE AVG(*)
Pig offers more SQL-like operators like JOIN, GROUP-BY, UNION, CROSS, ORDER, DISTINCT and an additional operator that is unique to Pig called COGROUP that takes advantage of the nestedness of the data model. One set of really common computations in data analysis follow the pattern JOIN+GROUP+aggregation and are a natural extension of GROUP+aggregation in the cases when we have more than one dataset. Its implementation in terms of MapReduce is highly efficient in comparison to the SQL’s which requires a costly cross-product between datasets to be calculated for the JOIN which is immediately undone by the GROUP function.
personal_details.csv | salaries.csv |
---|---|
sex, age, career | sex, salary |
Woman, 24, 1 Woman, 40, 5 Man, 25, 1 Man, 30, 3 |
Man, 24000 Woman, 24000 Woman, 35000 Man, 32000 |
COGROUP personal_details BY sex, salaries BY sex
Woman, {(Woman, 24, 1), (Woman, 40, 5)}, {(Woman, 24000), (Woman, 35000)}
Man, {(Man, 25, 1), (Man, 30, 3)}, {(Man, 32000), (Man, 24000)}
In the example above we can see the dataset calculated by COGROUP if we join the personal_details and the salaries dataset by the key “sex”. The resulting dataset has tuples formed by a key (Woman/Man), a bag of the tuples with this key in the first dataset and a bag of the tuples with this key in the second dataset. We would then proceed to calculate the necessary functions over this dataset.
One of the last features that is worth remarking from Pig is that it also offers a debugging environment that helps the development when working with really large datasets. This debugging environment constructs artificial sample datasets with the structures of the underlying input datasets. Usually, when testing data transformations on large datasets, developers tend to create data subsets for rapid testing and development. However, these datasets often don’t contain all possible cases that can arise (e.g. filters that result on empty datasets, 1:1 joins that are supposed to be 1:N or N:N, etc). The goal of this environment is therefore to offer better coverage of all these cases that can happen from transforming our datasets.
As we can see from the few examples treated above, Pig offers a higher level interface to data analysis that can be later compiled into efficient MapReduce jobs. Moreover, Pig’s data model offers a really easy transition for programmers that are used to the Python-like structures and their inherent flexibility and, conversely, for database fans that miss the SQL formulation. Interestingly, Pig offers a novel debugging environment that tries to create sandbox datasets that make incremental development and debugging easier. Personally, I had never heard of such a debugging environment and am not aware that any of the tools that I use for data analysis in Python count with anything similar. So I will try to report on this or similar environments on other posts. Seeeeeeeeeeeeeee ya.