Optimal Map tasks for Map Reduce Applications based on Time Complexity ???
Some Analysis
One of the strong points in Hadoop is its ability to efficiently partition the input files using API hooks in the HDFS and calling the Map tasks with each of these partitions. Its useful to understand how to find these task distribution to get the best possible performance achievable. There are few factors to consider when trying to pick the right number of map tasks but the obvious would be to strike a balance between the speedup from the distribution and the overhead of distribution.
Consider the case where time Complexity of the application is O(n) meaning the running time depend entirely on the size of the input, like in the case of WordCount. We can argue that the efficient sizes of the partition that the system could handle would be the deciding factor because the running time is only growing linearly with the input and you have to read the input anyway. So in such cases letting the Hadoop decide on the partition sizes will be the right approach because it will base the partition size based on the optimal block sizes. Why? Following may convince you why.
Lets assume Hadoop distribution overhead is linear with the Map tasks, then Total time would look like the following equation. This may be oversimplified equation because it leaves out the reduction complexity and without any argument assumed the overhead of Hadoop is linear with Map Tasks. But this simplicity would let you have more insight.
Total time = O (Input Size/Map Tasks) + Overhead * Map Tasks
So if the application has linear time complexity O(n) then ;
Total time = (Input Size/Map Tasks)+ Overhead * Map Tasks
Taking the derivative would tell you that Total time is minimum when
Map Tasks = sqrt(Input Size/Overhead)
Since this number is very dependent on the Overhead it make sense to let Hadoop do the partitioning and distribution.
Structured input for Map Reduce
Consider the case where the time complexity of the application is not linear but higher, then the dominant term in the Total time is O (Input Size/Map Tasks). In case the time complexity is O(n^2) =>
Total time = (Input Size/Map Tasks) ^ 2 + Overhead * Map Tasks
Here the first term grows much faster. So minimizing that by increasing the Map tasks is essential. So in such cases you cannot let the Hadoop come up with the number of Map tasks based on the optimal block sizes that it can handle but rather you need to force the number of map tasks upon it.
For example a java ray tracing application which has O(n^2) complexity according to Michael. In this you try to render a scene using ray tracing and this can be easily parallelized because each pixel calculation is independent of the other. When you parallelize it, you will have different segments you want to compute in the input file and you need to force the Map Reduce to calculate those segments in different map tasks. So you need much structured input than that in the word count example because your number of map tasks and your running time depend on it. Letting the partitioning be done to optimize the file sizes in such a compute intensive application would be unwise and in many cases counterproductive. Although it should be noted Hadoop was designed for data intensive parallelism rather than compute intensive parallelism .
Forcing Hadoop to run a map task for each line in input file
Hadoop provides a input formater called NLineInputFormat where each line in the input file would get mapped to a separate map task which allows you to have control over the number of map tasks that you want to force upon the Hadoop framework. Following is the incomplete code to setup the JobConfiguration and the Bold line shows what you need to do to set the NLineInputFormat. So if you know how to optimally map tasks you need to run you can allow those inputs to lines in the input file so Hadoop will create a new map task for each input.
JobConf conf = new JobConf(HadoopRayTracer.class);
conf.setJobName("raytrace");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(NLineInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class)
.......
Some Analysis
One of the strong points in Hadoop is its ability to efficiently partition the input files using API hooks in the HDFS and calling the Map tasks with each of these partitions. Its useful to understand how to find these task distribution to get the best possible performance achievable. There are few factors to consider when trying to pick the right number of map tasks but the obvious would be to strike a balance between the speedup from the distribution and the overhead of distribution.
Consider the case where time Complexity of the application is O(n) meaning the running time depend entirely on the size of the input, like in the case of WordCount. We can argue that the efficient sizes of the partition that the system could handle would be the deciding factor because the running time is only growing linearly with the input and you have to read the input anyway. So in such cases letting the Hadoop decide on the partition sizes will be the right approach because it will base the partition size based on the optimal block sizes. Why? Following may convince you why.
Lets assume Hadoop distribution overhead is linear with the Map tasks, then Total time would look like the following equation. This may be oversimplified equation because it leaves out the reduction complexity and without any argument assumed the overhead of Hadoop is linear with Map Tasks. But this simplicity would let you have more insight.
Total time = O (Input Size/Map Tasks) + Overhead * Map Tasks
So if the application has linear time complexity O(n) then ;
Total time = (Input Size/Map Tasks)+ Overhead * Map Tasks
Taking the derivative would tell you that Total time is minimum when
Map Tasks = sqrt(Input Size/Overhead)
Since this number is very dependent on the Overhead it make sense to let Hadoop do the partitioning and distribution.
Structured input for Map Reduce
Consider the case where the time complexity of the application is not linear but higher, then the dominant term in the Total time is O (Input Size/Map Tasks). In case the time complexity is O(n^2) =>
Total time = (Input Size/Map Tasks) ^ 2 + Overhead * Map Tasks
Here the first term grows much faster. So minimizing that by increasing the Map tasks is essential. So in such cases you cannot let the Hadoop come up with the number of Map tasks based on the optimal block sizes that it can handle but rather you need to force the number of map tasks upon it.
For example a java ray tracing application which has O(n^2) complexity according to Michael. In this you try to render a scene using ray tracing and this can be easily parallelized because each pixel calculation is independent of the other. When you parallelize it, you will have different segments you want to compute in the input file and you need to force the Map Reduce to calculate those segments in different map tasks. So you need much structured input than that in the word count example because your number of map tasks and your running time depend on it. Letting the partitioning be done to optimize the file sizes in such a compute intensive application would be unwise and in many cases counterproductive. Although it should be noted Hadoop was designed for data intensive parallelism rather than compute intensive parallelism .
Forcing Hadoop to run a map task for each line in input file
Hadoop provides a input formater called NLineInputFormat where each line in the input file would get mapped to a separate map task which allows you to have control over the number of map tasks that you want to force upon the Hadoop framework. Following is the incomplete code to setup the JobConfiguration and the Bold line shows what you need to do to set the NLineInputFormat. So if you know how to optimally map tasks you need to run you can allow those inputs to lines in the input file so Hadoop will create a new map task for each input.
JobConf conf = new JobConf(HadoopRayTracer.class);
conf.setJobName("raytrace");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(NLineInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class)
.......
Labels: each line, Hadoop, Map, Map Reduce, Time complexity