Chathura Herath 's Blog

My Photo
Name:
Location: Bloomington, Indiana, United States

Sunday, March 28, 2010

Programmatically Launch Elastic Map Reduce applications

I am writing this mainly to help the student that I am teaching this semester in Prof Beth Plale’s B534 class.


Short Background

Amazon Elastic MapReduce
has provides an API to Hadoop MapReduce jobs using Amazon EC2. If you have a Hadoop application you can use the Amazon Console to deploy and run it in the EC2. A great great tutorial on how to use the Amazon Console to run your Hadoop application can be found here.
If you do not have a Hadoop application you need to have a look at the famous word count example which can be found here.


Amazon abstraction of Map reduce.

Amazon has introduce two abstraction for its Elastic MapReduce framework and they are

1) Job Flow
2) Job Flow Step

It is important to understand these abstractions before using the API to launch MapReduce application. Following are the definitions provided by Amazon for Job Flow and Job Flow Step, but it would be much more intuitive to understand them in the context of an application.

Q: What is an Amazon Elastic MapReduce Job Flow?

A Job Flow is a collection of processing steps that Amazon Elastic MapReduce runs on a specified dataset using a set of Amazon EC2 instances. A Job Flow consists of one or more steps, each of which must complete in sequence successfully, for the Job Flow to finish.

Q: What is a Job Flow Step?

A Job Flow Step is a user-defined unit of processing, mapping roughly to one algorithm that manipulates the data. A step is a Hadoop MapReduce application implemented as a Java jar or a streaming program written in Java, Ruby, Perl, Python, PHP, R, or C++. For example, to count the frequency with which words appear in a document, and output them sorted by the count, the first step would be a MapReduce application which counts the occurrences of each word, and the second step would be a MapReduce application which sorts the output from the first step based on the counts.


Terminology: Single MapReduce Run

When you run a MapReduce application, data files will be read by the Hadoop framework and it will be divided in to say N segments and your map function will be called N times in parallel and at the completion of the map tasks your reduce function will be called. Once the reduce function is finished executing your outputs will be written to the output folder. We shall call this a Single MapReduce Run for the purpose of identification. So if you want to word count a particular log file and lets assume you ran it within a Single MapReduce Run and at the end of it or during the run you received another log message that you want to word count, so you may want to launch another Single MapReduce Run.

For the purpose of the class, a rendering of a given scene using the modified ray tracing library will be a Single MapReduce Run. In this Single MapReduce Run you will split the scene into sections and ray trace these sections/subviews in different Map tasks and you will combine the sections/subviews in the Reduce task and write it to the output. So if you have a second scene to render, it will be another Single MapReduce Run.


Single MapReduce Run –Job Flow Step

The Amazon Job Flow corresponds to a running application in Amazon Elastic MapReduce and could contain one or more Single MapReduce Runs. So the Single MapReduce Run described above correspond to a Job Flow Step in Amazon Elastic Map Reduce. Job Flow and Job Flow Step has somewhat like parent child relationship and its one-to-many.

The Job Flow corresponds to the setup of infrastructure, EC2 reservation and I am guessing for accounting. Within that Job Flow one may run Hadoop MapReduce applications. Each Single MapReduce Run will be a Job Flow Step inside that Job Flow.

Ec2 machine reservations are done at the Job Flow level and once reserved it is fixed for all the Job Flow Steps in that particular Job Flow. This would become much clearer when you get to the API calls.


Launching Elastic MapReduce jobs with programmatically using Java

Amazon provides an API client that enable Amazon Elastic MapReduce users to launch Hadoop MapReduce jobs programmatically. Again I assume by this time you have already launched and tested your application manually using Amazon MapReduce Console. You can download the client from here

Once you download it set it us with your IDE and find the com.amazonaws.elasticmapreduce.samples package. Inside this package you will find the few java files which will allow you to create Job Flows, add Job Flow Steps, Terminate Job Flows and Query Job Flow Status. In this discussion we will focus on first two (RunJobFlowSample.java and AddJobFlowStepsSample.java).

If you open RunJobFlowSample class from the Amazon you may notice its main method is pretty empty but the API that they provide is very easy and you would have to fill in your configurations to the bean objects they have provided and set it to the request. Following is the code to setup Job Flow with a single Job Flow Step. In other words it will setup the Hadoop framework using 11 machines and will launch Single MapReduce Run. You can download the implementation class here. You will obviously have to change the Amazon credential to your own.

Once you launch the Job Flow, you may go to the Amazon console and monitor the status of your Job Flow.



public static void main(String[] args) {


String accessKeyId = "FHJKDFIHKJDF";
String secretAccessKey = "DFJLDFODF/AND/NO/THIS/IS/NOT/MY/ACCESS/KEY";

AmazonElasticMapReduceConfig config = new AmazonElasticMapReduceConfig();
config.setSignatureVersion("0");
// config.set
AmazonElasticMapReduce service = new AmazonElasticMapReduceClient(
accessKeyId, secretAccessKey, config);


RunJobFlowRequest request = new RunJobFlowRequest();
JobFlowInstancesConfig conf = new JobFlowInstancesConfig();
conf.setEc2KeyName("class");
conf.setInstanceCount(11);
conf.setKeepJobFlowAliveWhenNoSteps(true);
conf.setMasterInstanceType("m1.small");
conf.setPlacement(new PlacementType("us-east-1a"));
conf.setSlaveInstanceType("m1.small");

request.setInstances(conf);
request.setLogUri("s3n://b534/logs");

String jobFlowName = "Class-job-flow" + new Date().toString();
jobFlowName = Utils.formatString(jobFlowName);
System.err.println(jobFlowName);

request.setName(jobFlowName);
String stepname = "Step" + System.currentTimeMillis();
List steps = new LinkedList();
StepConfig stepConfig = new StepConfig();
stepConfig.setActionOnFailure("CANCEL_AND_WAIT");
HadoopJarStepConfig jarsetup = new HadoopJarStepConfig();
List arguments = new LinkedList();
arguments.add("s3n://b534/inputs/");
arguments.add("s3n://b534/outputs/"+jobFlowName+"/"+stepname+"/");
jarsetup.setArgs(arguments);
jarsetup.setJar("s3n://b534/Hadoopv400.jar");
jarsetup.setMainClass("edu.indiana.extreme.HadoopRayTracer");
stepConfig.setHadoopJarStep(jarsetup);

stepConfig.setName(stepname);
steps.add(stepConfig);

request.setSteps(steps);

invokeRunJobFlow(service, request);

}




Adding a Job Flow Step to existing Job Flow

Above Job Flow will not shutdown once it finished the execution of its Job Flow Step, you may find the following line which is responsible for that.


conf.setKeepJobFlowAliveWhenNoSteps(true);

Now we will attempt to add another Job Flow Step to the Job Flow we started earlier. This will be another Single MapReduce Run because Job Flow Steps correspond to Single MapReduce Runs. Have a look at the AddJobFlowStepsSample.java class and this class will be used to add a job step to an already running Job Flow. Following is the implemented main method in that class that could be used to add a Job Flow Step to a Job Flow that is already running. In this you will have to set the jobflowID and the jobFlowName apart from the credentials and they would identify the already running Job Flow. Once you run it you can again go to Amazon management Console and monitor the progress. The implemented class could be found here.

This would be useful to reuse already allocated and booted up EC3 resources and launch multiple Single MapReduce Runs one after the another without having to incur setup cost each time. So if you want to word count a second data file or if you are a student in my class if you want to render a second scene, you can use this client to add a new Job Flow Step without having to setup all the machines again and incur the setup cost.



public static void main(String... args) {


//Set these values
String accessKeyId = "FHJKDFIHKJDF";
String secretAccessKey = "DFJLDFODF/YES!/YOU/GUESSED/IT/NO/THIS/IS/NOT/MY/ACCESS/KEY/NEITHER";
String jobflowID = "j-6XL4RL7E5A2";
String jobFlowName = "Class_job_flowSat_Mar_27_23_12_16_EDT_2010";



AmazonElasticMapReduce service = new AmazonElasticMapReduceClient(
accessKeyId, secretAccessKey);
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest();
String stepName = "Step" + System.currentTimeMillis();
System.err.println(stepName);


request.setJobFlowId(jobflowID);


List steps = new LinkedList();
StepConfig stepConfig = new StepConfig();
stepConfig.setActionOnFailure("CANCEL_AND_WAIT");
HadoopJarStepConfig jarsetup = new HadoopJarStepConfig();
List arguments = new LinkedList();
arguments.add("s3n://b534/inputs/");
arguments.add("s3n://b534/outputs/"+jobFlowName +"/"+stepName+"/");
jarsetup.setArgs(arguments);
jarsetup.setJar("s3n://b534/Hadoopv400.jar");
jarsetup.setMainClass("edu.indiana.extreme.HadoopRayTracer");
stepConfig.setHadoopJarStep(jarsetup);

stepConfig.setName(stepName);
steps.add(stepConfig);

request.setSteps(steps);

invokeAddJobFlowSteps(service, request);

}





Labels: , , , , ,