Apache Pig provides extensive support for user defined functions (UDFs) as a way to specify custom processing. Pig UDFs can currently be executed in three languages: Java, Python, JavaScript and Ruby. The most extensive support is provided for Java functions.
Java UDFs can be invoked through multiple ways. The simplest UDF can just extend EvalFunc, which requires only the exec function to be implemented. Every Eval UDF must implement this. Additionally, if a function is algebraic, it can implement Algebraic interface to significantly improve query performance.
Importance of UDFs in Pig:
Pig allows users to combine existing operators with their own or others’ code via UDFs. The advantage of Pig is its ability to let users combine its operators with their own or others’ code via UDFs. Up through version 0.7, all UDFs must be written in Java and are implemented as Java classes. This makes it easier to add new UDFs to Pig by writing a Java class and informing Pig about the JAR file.
Pig itself comes with some UDFs. Prior to version 0.8, it was a very limited set with only the standard SQL aggregate functions and a few others. In 0.8, a large number of standard string-processing, math, and complex-type UDFs were added.
What is a Piggybank?
Piggybank is a collection of user-contributed UDFs that is released along with Pig. Piggybank UDFs are not included in the Pig JAR, so you have to register them manually in your script. You can also write your own UDFs or use those written by other users.
Eval Functions
The UDF class extends the EvalFunc class which is the base for all Eval functions. All Evaluation functions extend the Java class ‘org.apache.pig.EvalFunc. ‘It is parameterized with the return type of the UDF which is a Java String in this case. The core method in this class is ‘exec.’ The 1st line of the code indicates that the function is a part of myudfs package.
It takes one record and returns one result, which will be invoked for every record that passes through the execution pipeline. It takes a tuple , which contains all of the fields the script passes to your UDF as a input. It then returns the type by which you have parameterized EvalFunc.
This function is invoked on every input tuple. The input into the function is a tuple with input parameters in the order they are passed to the function in the Pig script. In the example shown below, the function takes string as input. The following function converts the string from lowercase to uppercase. Now that the function is implemented, it needs to be compiled and included in a JAR.
package myudfs; import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; public class UPPER extends EvalFunc<String> { public String exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; try{ String str = (String)input.get(0); return str.toUpperCase(); }catch(Exception e){ throw new IOException("Caught exception processing input row ", e); } } }
Aggregate Functions:
Aggregate functions are another common type of Eval function. Aggregate functions are usually applied to grouped data. The Aggregate function takes a bag and returns a scalar value. An interesting and valuable feature of many Aggregate functions is that they can be computed incrementally in a distributed manner. In Hadoop world, this means that the partial computations can be done by the Map and Combiner and the final result can be computed by the Reducer.
It is very important to make sure that Aggregate functions that are algebraic are implemented as such. Examples of this type include the built-in COUNT, MIN, MAX and AVERAGE.
COUNT is an example of an algebraic function where we can count the number of elements in a subset of the data and then sum the counts to produce a final output. Let’s look at the implementation of the COUNT function:
public class COUNT extends EvalFunc<Long> implements Algebraic{ public Long exec(Tuple input) throws IOException {return count(input);} public String getInitial() {return Initial.class.getName();} public String getIntermed() {return Intermed.class.getName();} public String getFinal() {return Final.class.getName();} static public class Initial extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(count(input));} } static public class Intermed extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));} } static public class Final extends EvalFunc<Long> { public Tuple exec(Tuple input) throws IOException {return sum(input);} } static protected Long count(Tuple input) throws ExecException { Object values = input.get(0); if (values instanceof DataBag) return ((DataBag)values).size(); else if (values instanceof Map) return new Long(((Map)values).size()); } static protected Long sum(Tuple input) throws ExecException, NumberFormatException { DataBag values = (DataBag)input.get(0); long sum = 0; for (Iterator (Tuple) it = values.iterator(); it.hasNext();) { Tuple t = it.next(); sum += (Long)t.get(0); } return sum; } }
COUNT implements Algebraic interface which looks like this:
public interface Algebraic{ public String getInitial(); public String getIntermed(); public String getFinal(); }
For a function to be algebraic, it needs to implement Algebraic interface that consist of definition of three classes derived from EvalFunc. The contract is that the execfunction of the Initial class is called once and is passed to the original input tuple. Its output is a tuple that contains partial results. The exec function of the Intermed class can be called zero or more times and takes as its input a tuple that contains partial results produced by the Initial class or by prior invocations of the Intermed class and produces a tuple with another partial result. Finally, the exec function of the Final class is called and gives the final result as a scalar type.
Filter Functions:
Filter functions are Eval functions that returns a Boolean value. It can be used anywhere a Boolean expression is appropriate, including the FILTER operator or Bincond expression. Apache Pig does not support Boolean totally, so Filter functions cannot appear in statements such as ‘Foreach’, where the results are output to another operator. However, Filter functions can be used in filter statements.
The example below implements IsEmpty function:
import java.io.IOException; import java.util.Map; import org.apache.pig.FilterFunc; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.DataType; /** * Determine whether a bag or map is empty. */ public class IsEmpty extends FilterFunc { @Override public Boolean exec(Tuple input) throws IOException { try { Object values = input.get(0); if (values instanceof DataBag) return ((DataBag)values).size() == 0; else if (values instanceof Map) return ((Map)values).size() == 0; else { int errCode = 2102; String msg = "Cannot test a " + DataType.findTypeName(values) + " for emptiness."; throw new ExecException(msg, errCode, PigException.BUG); } } catch (ExecException ee) { throw ee; } } }