StoreFunc abstract class has the main methods for storing data and for most use cases it should suffice to extend it. There is an optional interface which can be implemented to achieve extended functionality:
StoreMetadata
This interface has methods to interact with metadata systems to store schema and statistics. This interface is optional and should be implemented only if metadata needs to stored.
The methods which need to be overridden in StoreFunc are explained below:
getOutputFormat():
This method will be called by Pig to get the OutputFormat used by the Storer. The methods in the OutputFormat will be called by Pig in the same manner and in the same context as by Hadoop in a map-reduce Java program. If the OutputFormat is a Hadoop packaged one, the implementation should use the new API based one under org.apache.hadoop.mapreduce. If it is a custom OutputFormat, it should be implemented using the new API under org.apache.hadoop.mapreduce. The checkOutputSpecs() method of the OutputFormat will be called by pig to check the output location up-front. This method will also be called as part of the Hadoop call sequence when the job is launched. So implementations should ensure that this method can be called multiple times without inconsistent side effects.
setStoreLocation():
This method is called by Pig to communicate the store location to the storer. The storer should use this method to communicate the same information to the underlying OutputFormat. This method is called multiple times by Pig. Implementations should take note that this method is called multiple times and should ensure there are no inconsistent side effects due to the multiple calls.
prepareToWrite():
In the new API, writing of the data is through the OutputFormat provided by the StoreFunc. In prepareToWrite() the RecordWriter associated with the OutputFormat provided by the StoreFunc is passed to the StoreFunc. The RecordWriter can then be used by the implementation in putNext() to write a tuple representing a record of data in a manner expected by the RecordWriter.
putNext():
The meaning of putNext() has not changed and is called by Pig runtime to write the next tuple of data – in the new API, this is the method wherein the implementation will use the underlying RecordWriter to write the Tuple out.
Default Implementations in StoreFunc:
setStoreFuncUDFContextSignature():
This method will be called by Pig both in the front end and back end to pass a unique signature to the Storer. The signature can be used to store any information in to the UDFContext which the Storer needs to store between various method invocations in the front end and back end. The default implementation in StoreFunc has an empty body. This method will be called before any other methods.
relToAbsPathForStoreLocation():
Pig runtime will call this method to allow the Storer to convert a relative store location to an absolute location. An implementation is provided in StoreFunc which handles this for FileSystem based locations.
checkSchema():
A Store function should implement this function to check that a given schema describing the data to be written is acceptable to it. The default implementation in StoreFunc has an empty body. This method will be called before any calls to setStoreLocation().
Gain hands-on experience in building and managing data storage, processing, and analytics solutions with the Azure Data Engineer Certification Course.
Example Implementation:
The storer implementation in the example, is a storer for text data with line delimiter as ‘
‘ and ‘ ‘ as default field delimiter (which can be overridden by passing a different field delimiter in the constructor) – this is similar to current PigStorage storer in Pig. The implementation uses an existing Hadoop supported OutputFormat – TextOutputFormat as the underlying OutputFormat.
public class SimpleTextStorer extends StoreFunc { protected RecordWriter writer = null; private byte fieldDel = ' '; private static final int BUFFER_SIZE = 1024; private static final String UTF8 = "UTF-8"; public PigStorage() { } public PigStorage(String delimiter) { this(); if (delimiter.length() == 1) { this.fieldDel = (byte)delimiter.charAt(0); } else if (delimiter.length() > 1delimiter.charAt(0) == '') { switch (delimiter.charAt(1)) { case 't': this.fieldDel = (byte)' '; break; case 'x': fieldDel = Integer.valueOf(delimiter.substring(2), 16).byteValue(); break; case 'u': this.fieldDel = Integer.valueOf(delimiter.substring(2)).byteValue(); break; default: throw new RuntimeException("Unknown delimiter " + delimiter); } } else { throw new RuntimeException("PigStorage delimeter must be a single character"); } } ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE); @Override public void putNext(Tuple f) throws IOException { int sz = f.size(); for (int i = 0; i < sz; i++) { Object field; try { field = f.get(i); } catch (ExecException ee) { throw ee; } putField(field); if (i != sz - 1) { mOut.write(fieldDel); } } Text text = new Text(mOut.toByteArray()); try { writer.write(null, text); mOut.reset(); } catch (InterruptedException e) { throw new IOException(e); } } @SuppressWarnings("unchecked") private void putField(Object field) throws IOException { //string constants for each delimiter String tupleBeginDelim = "("; String tupleEndDelim = ")"; String bagBeginDelim = "{"; String bagEndDelim = "}"; String mapBeginDelim = "["; String mapEndDelim = "]"; String fieldDelim = ","; String mapKeyValueDelim = "#"; switch (DataType.findType(field)) { case DataType.NULL: break; // just leave it empty case DataType.BOOLEAN: mOut.write(((Boolean)field).toString().getBytes()); break; case DataType.INTEGER: mOut.write(((Integer)field).toString().getBytes()); break; case DataType.LONG: mOut.write(((Long)field).toString().getBytes()); break; case DataType.FLOAT: mOut.write(((Float)field).toString().getBytes()); break; case DataType.DOUBLE: mOut.write(((Double)field).toString().getBytes()); break; case DataType.BYTEARRAY: { byte[] b = ((DataByteArray)field).get(); mOut.write(b, 0, b.length); break; } case DataType.CHARARRAY: // oddly enough, writeBytes writes a string mOut.write(((String)field).getBytes(UTF8)); break; case DataType.MAP: boolean mapHasNext = false; Map<String, Object> m = (Map<String, Object>)field; mOut.write(mapBeginDelim.getBytes(UTF8)); for(Map.Entry<String, Object> e: m.entrySet()) { if(mapHasNext) { mOut.write(fieldDelim.getBytes(UTF8)); } else { mapHasNext = true; } putField(e.getKey()); mOut.write(mapKeyValueDelim.getBytes(UTF8)); putField(e.getValue()); } mOut.write(mapEndDelim.getBytes(UTF8)); break; case DataType.TUPLE: boolean tupleHasNext = false; Tuple t = (Tuple)field; mOut.write(tupleBeginDelim.getBytes(UTF8)); for(int i = 0; i < t.size(); ++i) { if(tupleHasNext) { mOut.write(fieldDelim.getBytes(UTF8)); } else { tupleHasNext = true; } try { putField(t.get(i)); } catch (ExecException ee) { throw ee; } } mOut.write(tupleEndDelim.getBytes(UTF8)); break; case DataType.BAG: boolean bagHasNext = false; mOut.write(bagBeginDelim.getBytes(UTF8)); Iterator<Tuple> tupleIter = ((DataBag)field).iterator(); while(tupleIter.hasNext()) { if(bagHasNext) { mOut.write(fieldDelim.getBytes(UTF8)); } else { bagHasNext = true; } putField((Object)tupleIter.next()); } mOut.write(bagEndDelim.getBytes(UTF8)); break; default: { int errCode = 2108; String msg = "Could not determine data type of field: " + field; throw new ExecException(msg, errCode, PigException.BUG); } } } @Override public OutputFormat getOutputFormat() { return new TextOutputFormat<WritableComparable, Text>(); } @Override public void prepareToWrite(RecordWriter writer) { this.writer = writer; } @Override public void setStoreLocation(String location, Job job) throws IOException { job.getConfiguration().set("mapred.textoutputformat.separator", ""); FileOutputFormat.setOutputPath(job, new Path(location)); if (location.endsWith(".bz2")) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); } else if (location.endsWith(".gz")) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } } }
Got a question for us? Please mention them in the comments section and we will get back to you.
Related Posts:
Apache Pig UDF: Part 2
Apache Pig UDF: Part 1
Big Data and Hadoop Training