When storing massive amounts of data in a distributed storage the Map/Reduce (M/R) algorithm provides a generic and effective means to compute data transformations spanning large datasets. The underlying assumption is that each storage node (think: machine) provides computing power that can be used for custom, parallelized data processing and benefits strongly from data locality.
At Original1 we use Apache Hadoop, a popular open source implemention of a distributed file system (HDFS) and of an M/R runtime in conjunction with Apache HBase, a NoSQL database, to store large numbers of rather simply structured, versioned data records.
In order to provide analytical insight across all data records, we use Map/Reduce jobs that compute aggregations, look at anormalities in record history and more.
One of the attractive aspects of Map/Reduce jobs is that it is your code that is executed directly where the data is stored. Not some restricted query language. That is, on the other hand, also one of the biggest weaknesses: The M/R implementation is a very weak and limiting application container.
This post is about how the z2-Environment can be used to overcome this limitation and how to integrate M/R seamlessly into your solution.
What’s the Problem?
In order to run your M/R implementation you need to package the job’s code into a JAR archive and schedule the job with Hadoop’s Job Tracker. Your JAR gets eventually downloaded to all data nodes and executed locally. This is no unreasonable limitation, if that’s all you want to do: Run M/R jobs. Typically however, data analysis is just one part of the overall solution and in order to make sense out of the data, you will want to reuse a lot of the code that you used elsewhere, e.g. domain types, service implementations, utilities.
Also, whatever is the result your job delivers, frequently it will need RDBMS stored data to be completed and/or stored. And no, contrary to some enthusiasts, HBase/Hadoop or other NoSQL storage engines will put no end to relational databases.
So, in short, while the technology is an interesting addition, it is poorly integrated.
How Z2 Fixes This
The integration of z2 with Hadoop is an example of using the embedded mode of z2 (see embedded). When scheduling an M/R job, below is an example of how that happens, z2 will actually upload a JAR with Hadoop for execution. That JAR however is completely generic and all it does is to start the z2 runtime, retrieve the actual Job component and delegate the actual Hadoop interface calls (Mapper, Combiner, Reducer) to the z2 hosted implementation.
The embedded z2 works as always: It checks whether local updates from the connected SCM are required and updates (and compiles if applicable) on demand. Your Job component can do what any other component of your solution can do, e.g. re-use code, connect with a database etc. It is defined where all the rest is defined, in the very same module/component structure.
Anatomy of an M/R Job
As part of the integration, the Hadoop module provides a component type com.zfabrik.hadoop.job. A component of this type is supposed to provide an implementation of the interface IMapReduceJob that describes everything a Job implementation has to say about its execution:
public interface IMapReduceJob<MKEYIN,MVALUEIN,MKEYOUT,MVALUEOUT,RKEYOUT,RVALUEOUT> { String TYPE = "com.zfabrik.hadoop.job"; void init(String name); boolean hasMapper(); boolean hasCombiner(); boolean hasReducer(); Mapper<MKEYIN,MVALUEIN,MKEYOUT,MVALUEOUT> getMapper(Configuration configuration); Reducer<MKEYOUT,MVALUEOUT,MKEYOUT,MVALUEOUT> getCombiner(Configuration configuration); Reducer<MKEYOUT,MVALUEOUT,RKEYOUT,RVALUEOUT> getReducer(Configuration configuration); Job configure(Configuration configuration) throws Exception; }
The methods are relevant at different parts in the life cycle of an M/R job. In order to schedule, the configure method will be called. Also, at the time of configuration, the job will be asked about what M/R phases it supports (hasMapper, hasCombiner, hasReducer).
At execution time, on the data nodes, the same implementation will be asked to provide concrete implementations (getMapper, getCombiner, getReducer). Hence, the interface is really just a simple abstraction of a job definition.
For example, the configure method for the Word Count example (Hadoop’s Hello World program, see http://wiki.apache.org/hadoop/WordCount), would look like this:
public Job configure(Configuration conf) throws Exception { Job job = new Job(conf); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(this.inPath)); FileOutputFormat.setOutputPath(job, new Path(this.outPath)); return job; }
Very much like the original – if not simpler. The difference is that the configuration is preset by z2, in order to abstract the actual connectivity information (which should of course not be part of the component), and that the actual job submission, for similar reasons, is not in the job component implementation.
Jobs can be scheduled programmatically or from a simple web interface for execution on any configured hadoop cluster or locally (for debugging).
Programmatic scheduling looks like this:
IJobConfigurator jc = IComponentsLookup.INSTANCE.lookup(jobComponentName,IJobConfigurator.class); Configuration c = IComponentsLookup.INSTANCE.lookup(hadoopConfigurationComponentName,Configuration.class); jc.configure(c); jc.submit();
The first lookup is for the Job Component. E.g. the Word Count job implementation. The second lookup is for the Hadoop client configuration component. Another component type that simply holds a core-site.xml file (Hadoop’s properties in XML file format) that defines how to connect to Hadoop (and possibly HBase) and more.
Be aware that programmatic scheduling is a common, although not completely trivial use-case: In most cases M/R jobs will be scheduled either on a by-time basis (i.e. via some cron-type scheduling) or as the result of some data change that invalidates some previous computation. Smart scheduling of jobs is a worthy subject on its own (i.e. a related post just got scheduled).
Where Can I Get That?
This technology is used by Original1. We are planning to release it in a new distribution (working title z2@hadoop) anytime soon (i.e. as soon as we get to it). That’s why this post is called “Part 1” – Part 2 will point to the actual distribution and documentation. If you want it earlier – talk to us.
Summary
The integration of z2 with Hadoop is another strong case of the System Centric approach. Via a seamless integration, Hadoop can be added to your solution space without the need to change the structure of other parts and with full re-use capabilities.