Chathura Herath 's Blog

My Photo
Name:
Location: Bloomington, Indiana, United States

Wednesday, March 31, 2010

Optimal Map tasks for Map Reduce Applications based on Time Complexity ???

Some Analysis

One of the strong points in Hadoop is its ability to efficiently partition the input files using API hooks in the HDFS and calling the Map tasks with each of these partitions. Its useful to understand how to find these task distribution to get the best possible performance achievable. There are few factors to consider when trying to pick the right number of map tasks but the obvious would be to strike a balance between the speedup from the distribution and the overhead of distribution.

Consider the case where time Complexity of the application is O(n) meaning the running time depend entirely on the size of the input, like in the case of WordCount. We can argue that the efficient sizes of the partition that the system could handle would be the deciding factor because the running time is only growing linearly with the input and you have to read the input anyway. So in such cases letting the Hadoop decide on the partition sizes will be the right approach because it will base the partition size based on the optimal block sizes. Why? Following may convince you why.

Lets assume Hadoop distribution overhead is linear with the Map tasks, then Total time would look like the following equation. This may be oversimplified equation because it leaves out the reduction complexity and without any argument assumed the overhead of Hadoop is linear with Map Tasks. But this simplicity would let you have more insight.

Total time = O (Input Size/Map Tasks) + Overhead * Map Tasks

So if the application has linear time complexity O(n) then ;

Total time = (Input Size/Map Tasks)+ Overhead * Map Tasks

Taking the derivative would tell you that Total time is minimum when

Map Tasks = sqrt(Input Size/Overhead)

Since this number is very dependent on the Overhead it make sense to let Hadoop do the partitioning and distribution.



Structured input for Map Reduce

Consider the case where the time complexity of the application is not linear but higher, then the dominant term in the Total time is O (Input Size/Map Tasks). In case the time complexity is O(n^2) =>


Total time = (Input Size/Map Tasks) ^ 2 + Overhead * Map Tasks


Here the first term grows much faster. So minimizing that by increasing the Map tasks is essential. So in such cases you cannot let the Hadoop come up with the number of Map tasks based on the optimal block sizes that it can handle but rather you need to force the number of map tasks upon it.

For example a java ray tracing application which has O(n^2) complexity according to Michael. In this you try to render a scene using ray tracing and this can be easily parallelized because each pixel calculation is independent of the other. When you parallelize it, you will have different segments you want to compute in the input file and you need to force the Map Reduce to calculate those segments in different map tasks. So you need much structured input than that in the word count example because your number of map tasks and your running time depend on it. Letting the partitioning be done to optimize the file sizes in such a compute intensive application would be unwise and in many cases counterproductive. Although it should be noted Hadoop was designed for data intensive parallelism rather than compute intensive parallelism .


Forcing Hadoop to run a map task for each line in input file

Hadoop provides a input formater called NLineInputFormat where each line in the input file would get mapped to a separate map task which allows you to have control over the number of map tasks that you want to force upon the Hadoop framework. Following is the incomplete code to setup the JobConfiguration and the Bold line shows what you need to do to set the NLineInputFormat. So if you know how to optimally map tasks you need to run you can allow those inputs to lines in the input file so Hadoop will create a new map task for each input.


JobConf conf = new JobConf(HadoopRayTracer.class);
conf.setJobName("raytrace");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(NLineInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class)
.......

Labels: , , , ,

Sunday, March 28, 2010

Programmatically Launch Elastic Map Reduce applications

I am writing this mainly to help the student that I am teaching this semester in Prof Beth Plale’s B534 class.


Short Background

Amazon Elastic MapReduce
has provides an API to Hadoop MapReduce jobs using Amazon EC2. If you have a Hadoop application you can use the Amazon Console to deploy and run it in the EC2. A great great tutorial on how to use the Amazon Console to run your Hadoop application can be found here.
If you do not have a Hadoop application you need to have a look at the famous word count example which can be found here.


Amazon abstraction of Map reduce.

Amazon has introduce two abstraction for its Elastic MapReduce framework and they are

1) Job Flow
2) Job Flow Step

It is important to understand these abstractions before using the API to launch MapReduce application. Following are the definitions provided by Amazon for Job Flow and Job Flow Step, but it would be much more intuitive to understand them in the context of an application.

Q: What is an Amazon Elastic MapReduce Job Flow?

A Job Flow is a collection of processing steps that Amazon Elastic MapReduce runs on a specified dataset using a set of Amazon EC2 instances. A Job Flow consists of one or more steps, each of which must complete in sequence successfully, for the Job Flow to finish.

Q: What is a Job Flow Step?

A Job Flow Step is a user-defined unit of processing, mapping roughly to one algorithm that manipulates the data. A step is a Hadoop MapReduce application implemented as a Java jar or a streaming program written in Java, Ruby, Perl, Python, PHP, R, or C++. For example, to count the frequency with which words appear in a document, and output them sorted by the count, the first step would be a MapReduce application which counts the occurrences of each word, and the second step would be a MapReduce application which sorts the output from the first step based on the counts.


Terminology: Single MapReduce Run

When you run a MapReduce application, data files will be read by the Hadoop framework and it will be divided in to say N segments and your map function will be called N times in parallel and at the completion of the map tasks your reduce function will be called. Once the reduce function is finished executing your outputs will be written to the output folder. We shall call this a Single MapReduce Run for the purpose of identification. So if you want to word count a particular log file and lets assume you ran it within a Single MapReduce Run and at the end of it or during the run you received another log message that you want to word count, so you may want to launch another Single MapReduce Run.

For the purpose of the class, a rendering of a given scene using the modified ray tracing library will be a Single MapReduce Run. In this Single MapReduce Run you will split the scene into sections and ray trace these sections/subviews in different Map tasks and you will combine the sections/subviews in the Reduce task and write it to the output. So if you have a second scene to render, it will be another Single MapReduce Run.


Single MapReduce Run –Job Flow Step

The Amazon Job Flow corresponds to a running application in Amazon Elastic MapReduce and could contain one or more Single MapReduce Runs. So the Single MapReduce Run described above correspond to a Job Flow Step in Amazon Elastic Map Reduce. Job Flow and Job Flow Step has somewhat like parent child relationship and its one-to-many.

The Job Flow corresponds to the setup of infrastructure, EC2 reservation and I am guessing for accounting. Within that Job Flow one may run Hadoop MapReduce applications. Each Single MapReduce Run will be a Job Flow Step inside that Job Flow.

Ec2 machine reservations are done at the Job Flow level and once reserved it is fixed for all the Job Flow Steps in that particular Job Flow. This would become much clearer when you get to the API calls.


Launching Elastic MapReduce jobs with programmatically using Java

Amazon provides an API client that enable Amazon Elastic MapReduce users to launch Hadoop MapReduce jobs programmatically. Again I assume by this time you have already launched and tested your application manually using Amazon MapReduce Console. You can download the client from here

Once you download it set it us with your IDE and find the com.amazonaws.elasticmapreduce.samples package. Inside this package you will find the few java files which will allow you to create Job Flows, add Job Flow Steps, Terminate Job Flows and Query Job Flow Status. In this discussion we will focus on first two (RunJobFlowSample.java and AddJobFlowStepsSample.java).

If you open RunJobFlowSample class from the Amazon you may notice its main method is pretty empty but the API that they provide is very easy and you would have to fill in your configurations to the bean objects they have provided and set it to the request. Following is the code to setup Job Flow with a single Job Flow Step. In other words it will setup the Hadoop framework using 11 machines and will launch Single MapReduce Run. You can download the implementation class here. You will obviously have to change the Amazon credential to your own.

Once you launch the Job Flow, you may go to the Amazon console and monitor the status of your Job Flow.



public static void main(String[] args) {


String accessKeyId = "FHJKDFIHKJDF";
String secretAccessKey = "DFJLDFODF/AND/NO/THIS/IS/NOT/MY/ACCESS/KEY";

AmazonElasticMapReduceConfig config = new AmazonElasticMapReduceConfig();
config.setSignatureVersion("0");
// config.set
AmazonElasticMapReduce service = new AmazonElasticMapReduceClient(
accessKeyId, secretAccessKey, config);


RunJobFlowRequest request = new RunJobFlowRequest();
JobFlowInstancesConfig conf = new JobFlowInstancesConfig();
conf.setEc2KeyName("class");
conf.setInstanceCount(11);
conf.setKeepJobFlowAliveWhenNoSteps(true);
conf.setMasterInstanceType("m1.small");
conf.setPlacement(new PlacementType("us-east-1a"));
conf.setSlaveInstanceType("m1.small");

request.setInstances(conf);
request.setLogUri("s3n://b534/logs");

String jobFlowName = "Class-job-flow" + new Date().toString();
jobFlowName = Utils.formatString(jobFlowName);
System.err.println(jobFlowName);

request.setName(jobFlowName);
String stepname = "Step" + System.currentTimeMillis();
List steps = new LinkedList();
StepConfig stepConfig = new StepConfig();
stepConfig.setActionOnFailure("CANCEL_AND_WAIT");
HadoopJarStepConfig jarsetup = new HadoopJarStepConfig();
List arguments = new LinkedList();
arguments.add("s3n://b534/inputs/");
arguments.add("s3n://b534/outputs/"+jobFlowName+"/"+stepname+"/");
jarsetup.setArgs(arguments);
jarsetup.setJar("s3n://b534/Hadoopv400.jar");
jarsetup.setMainClass("edu.indiana.extreme.HadoopRayTracer");
stepConfig.setHadoopJarStep(jarsetup);

stepConfig.setName(stepname);
steps.add(stepConfig);

request.setSteps(steps);

invokeRunJobFlow(service, request);

}




Adding a Job Flow Step to existing Job Flow

Above Job Flow will not shutdown once it finished the execution of its Job Flow Step, you may find the following line which is responsible for that.


conf.setKeepJobFlowAliveWhenNoSteps(true);

Now we will attempt to add another Job Flow Step to the Job Flow we started earlier. This will be another Single MapReduce Run because Job Flow Steps correspond to Single MapReduce Runs. Have a look at the AddJobFlowStepsSample.java class and this class will be used to add a job step to an already running Job Flow. Following is the implemented main method in that class that could be used to add a Job Flow Step to a Job Flow that is already running. In this you will have to set the jobflowID and the jobFlowName apart from the credentials and they would identify the already running Job Flow. Once you run it you can again go to Amazon management Console and monitor the progress. The implemented class could be found here.

This would be useful to reuse already allocated and booted up EC3 resources and launch multiple Single MapReduce Runs one after the another without having to incur setup cost each time. So if you want to word count a second data file or if you are a student in my class if you want to render a second scene, you can use this client to add a new Job Flow Step without having to setup all the machines again and incur the setup cost.



public static void main(String... args) {


//Set these values
String accessKeyId = "FHJKDFIHKJDF";
String secretAccessKey = "DFJLDFODF/YES!/YOU/GUESSED/IT/NO/THIS/IS/NOT/MY/ACCESS/KEY/NEITHER";
String jobflowID = "j-6XL4RL7E5A2";
String jobFlowName = "Class_job_flowSat_Mar_27_23_12_16_EDT_2010";



AmazonElasticMapReduce service = new AmazonElasticMapReduceClient(
accessKeyId, secretAccessKey);
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest();
String stepName = "Step" + System.currentTimeMillis();
System.err.println(stepName);


request.setJobFlowId(jobflowID);


List steps = new LinkedList();
StepConfig stepConfig = new StepConfig();
stepConfig.setActionOnFailure("CANCEL_AND_WAIT");
HadoopJarStepConfig jarsetup = new HadoopJarStepConfig();
List arguments = new LinkedList();
arguments.add("s3n://b534/inputs/");
arguments.add("s3n://b534/outputs/"+jobFlowName +"/"+stepName+"/");
jarsetup.setArgs(arguments);
jarsetup.setJar("s3n://b534/Hadoopv400.jar");
jarsetup.setMainClass("edu.indiana.extreme.HadoopRayTracer");
stepConfig.setHadoopJarStep(jarsetup);

stepConfig.setName(stepName);
steps.add(stepConfig);

request.setSteps(steps);

invokeAddJobFlowSteps(service, request);

}





Labels: , , , , ,