Map Reduce Applications with z2 – Part 1

When storing massive amounts of data in a distributed storage the Map/Reduce (M/R) algorithm provides a generic and effective means to compute data transformations spanning large datasets. The underlying assumption is that each storage node (think: machine) provides computing power that can be used for custom, parallelized data processing and benefits strongly from data locality.

At Original1 we use Apache Hadoop, a popular open source implemention of a distributed file system (HDFS) and of an M/R runtime in conjunction with Apache HBase, a NoSQL database, to store large numbers of rather simply structured, versioned data records.

In order to provide analytical insight across all data records, we use Map/Reduce jobs that compute aggregations, look at anormalities in record history and more.

One of the attractive aspects of Map/Reduce jobs is that it is your code that is executed directly where the data is stored. Not some restricted query language. That is, on the other hand, also one of the biggest weaknesses: The M/R implementation is a very weak and limiting application container.

This post is about how the z2-Environment can be used to overcome this limitation and how to integrate M/R seamlessly into your solution.

What’s the Problem?

In order to run your M/R implementation you need to package the job’s code into a JAR archive and schedule the job with Hadoop’s Job Tracker. Your JAR gets eventually downloaded to all data nodes and executed locally. This is no unreasonable limitation, if that’s all you want to do: Run M/R jobs. Typically however, data analysis is just one part of the overall solution and in order to make sense out of the data, you will want to reuse a lot of the code that you used elsewhere, e.g. domain types, service implementations, utilities.

Also, whatever is the result your job delivers, frequently it will need RDBMS stored data to be completed and/or stored. And no, contrary to some enthusiasts, HBase/Hadoop or other NoSQL storage engines will put no end to relational databases.

So, in short, while the technology is an interesting addition, it is poorly integrated.

How Z2 Fixes This

The integration of z2 with Hadoop is an example of using the embedded mode of z2 (see embedded). When scheduling an M/R job, below is an example of how that happens, z2 will actually upload a JAR with Hadoop for execution. That JAR however is completely generic and all it does is to start the z2 runtime, retrieve the actual Job component and delegate the actual Hadoop interface calls (Mapper, Combiner, Reducer) to the z2 hosted implementation.

z2_hadoop

The embedded z2 works as always: It checks whether local updates from the connected SCM are required and updates (and compiles if applicable) on demand. Your Job component can do what any other component of your solution can do, e.g. re-use code, connect with a database etc. It is defined where all the rest is defined, in the very same module/component structure.

Anatomy of an M/R Job

As part of the integration, the Hadoop module provides a component type com.zfabrik.hadoop.job. A component of this type is supposed to provide an implementation of the interface IMapReduceJob that describes everything a Job implementation has to say about its execution:

public interface IMapReduceJob<MKEYIN,MVALUEIN,MKEYOUT,MVALUEOUT,RKEYOUT,RVALUEOUT> {
    String TYPE = "com.zfabrik.hadoop.job";
    void init(String name);

    boolean hasMapper();
    boolean hasCombiner();
    boolean hasReducer();

    Mapper<MKEYIN,MVALUEIN,MKEYOUT,MVALUEOUT> getMapper(Configuration configuration);
    Reducer<MKEYOUT,MVALUEOUT,MKEYOUT,MVALUEOUT> getCombiner(Configuration configuration);
    Reducer<MKEYOUT,MVALUEOUT,RKEYOUT,RVALUEOUT> getReducer(Configuration configuration);

    Job configure(Configuration configuration) throws Exception;
}

The methods are relevant at different parts in the life cycle of an M/R job. In order to schedule, the configure method will be called. Also, at the time of configuration, the job will be asked about what M/R phases it supports (hasMapper, hasCombiner, hasReducer).

At execution time, on the data nodes, the same implementation will be asked to provide concrete implementations (getMapper, getCombiner, getReducer). Hence, the interface is really just a simple abstraction of a job definition.

For example, the configure method for the Word Count example (Hadoop’s Hello World program, see http://wiki.apache.org/hadoop/WordCount), would look like this:

 public Job configure(Configuration conf) throws Exception {
    Job job = new Job(conf);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(this.inPath));
    FileOutputFormat.setOutputPath(job, new Path(this.outPath));
    return job;
}

Very much like the original – if not simpler. The difference is that the configuration is preset by z2, in order to abstract the actual connectivity information (which should of course not be part of the component), and that the actual job submission, for similar reasons, is not in the job component implementation.

Jobs can be scheduled programmatically or from a simple web interface for execution on any configured hadoop cluster or locally (for debugging).

Programmatic scheduling looks like this:

IJobConfigurator jc = IComponentsLookup.INSTANCE.lookup(jobComponentName,IJobConfigurator.class);
Configuration c = IComponentsLookup.INSTANCE.lookup(hadoopConfigurationComponentName,Configuration.class);
jc.configure(c);
jc.submit();

The first lookup is for the Job Component. E.g. the Word Count job implementation. The second lookup is for the Hadoop client configuration component. Another component type that simply holds a core-site.xml file (Hadoop’s properties in XML file format) that defines how to connect to Hadoop (and possibly HBase) and more.

Be aware that programmatic scheduling is a common, although not completely trivial use-case: In most cases M/R jobs will be scheduled either on a by-time basis (i.e. via some cron-type scheduling) or as the result of some data change that invalidates some previous computation. Smart scheduling of jobs is a worthy subject on its own (i.e. a related post just got scheduled).

Where Can I Get That?

This technology is used by Original1. We are planning to release it in a new distribution (working title z2@hadoop) anytime soon (i.e. as soon as we get to it). That’s why this post is called “Part 1” – Part 2 will point to the actual distribution and documentation. If you want it earlier – talk to us.

Summary

The integration of z2 with Hadoop is another strong case of the System Centric approach. Via a seamless integration, Hadoop can be added to your solution space without the need to change the structure of other parts and with full re-use capabilities.

The Agony Cycle of Solution Development

Software projects, eventually, go through one or more “Cycles of Agony”.

At the start of a software project, opportunities look promising, productivity and motivation is high. It takes experience and sweat to come up with a somewhat stable initial structure that is instructive, scales with the problem space, is sufficiently modular (i.e. keeps complexity manageable).

Regardless how well you manage all necessary compromises (e.g. productivity vs. process), how well you were able to foresee required tools and technologies, how well the business case was understood: Projects (if not abandoned) eventually run into a state of unproductivity that shows one or more of the following symptoms:

  • Developers are unsure on how things are supposed to be done,
  • The architecture tends to show unneeded abstractions of previous abstractions,
  • Conceptual integrity breaks down,
  • a general fear of touching things that “worked before” (no daring)

It’s unavoidable. Inherently, the business scenario at hand is only partially understood and will most likely change over time, inherently new use-cases, new technologies to integrate with, new customer requirements will show up (why spend money on developing a new software otherwise at all?). Hence the probability to make only the right far-reaching decisions is essentially zero.

The Agony Cycle looks somewhat like this:

What to do, when you are in the “agony” part of the chart? Well, it depends!

If the solution at hand is a “cash cow” (rather than a star), it may well make sense to not dare big change and consciously  move on into indefinite maintenance (and for developers to leave). Otherwise though, tough decisions are required to get you back on track. Previous decisions have to be reverted, possibly requiring non-trivial refactorings that do not result in immediate benefits. Although you (hopefully) have a product already, you need to pay a price that doesn’t linearly add obvious value (a concept that looked much more acceptable in the beginning). Whereas in the beginning of the cycle craftmanship was essential, now the willingness for change is key to further success.

In larger organizations, the risk of change may be considered too high and the need of change is politically undesired. That’s one reason why we see seemingly inexplicable failure to innovate on existing products by big players in the market.