Ver Fonte

commit 0d5da097f9483c784e5c88d13f13b91aa9eba625
Author: Arun C Murthy <acmurthy@apache.org>
Date: Fri Dec 11 09:13:53 2009 +0530

MAPREDUCE-353. Allow shuffle read and connection timeouts to be configurable. Contributed by Amareshwari Sriramadasu.

From https://issues.apache.org/jira/secure/attachment/12427566/patch-353-ydist.txt

+++ b/YAHOO-CHANGES.txt
+
+ MAPREDUCE-353. Allow shuffle read and connection timeouts to be
+ configurable. (Amareshwari Sriramadasu via acmurthy)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077072 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley há 14 anos atrás
pai
commit
d5f4150090

+ 17 - 0
src/mapred/mapred-default.xml

@@ -296,6 +296,23 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.reduce.shuffle.connect.timeout</name>
+  <value>180000</value>
+  <description>Expert: The maximum amount of time (in milli seconds) a reduce
+  task spends in trying to connect to a tasktracker for getting map output.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.reduce.shuffle.read.timeout</name>
+  <value>180000</value>
+  <description>Expert: The maximum amount of time (in milli seconds) a reduce
+  task waits for map output data to be available for reading after obtaining
+  connection.
+  </description>
+</property>
+
 <property>
   <name>mapred.task.timeout</name>
   <value>600000</value>

+ 11 - 4
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -1134,6 +1134,8 @@ class ReduceTask extends Task {
       private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
       // default read timeout (in milliseconds)
       private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
+      private final int shuffleConnectionTimeout;
+      private final int shuffleReadTimeout;
 
       private MapOutputLocation currentLocation = null;
       private int id = nextMapOutputCopierId++;
@@ -1149,6 +1151,11 @@ class ReduceTask extends Task {
         LOG.debug(getName() + " created");
         this.reporter = reporter;
         
+        shuffleConnectionTimeout =
+          job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
+        shuffleReadTimeout =
+          job.getInt("mapreduce.reduce.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);
+        
         if (job.getCompressMapOutput()) {
           Class<? extends CompressionCodec> codecClass =
             job.getMapOutputCompressorClass(DefaultCodec.class);
@@ -1370,8 +1377,8 @@ class ReduceTask extends Task {
         // Connect
         URLConnection connection = 
           mapOutputLoc.getOutputLocation().openConnection();
-        InputStream input = getInputStream(connection, STALLED_COPY_TIMEOUT,
-                                           DEFAULT_READ_TIMEOUT); 
+        InputStream input = getInputStream(connection, shuffleConnectionTimeout,
+                                           shuffleReadTimeout); 
         
         // Validate header from map output
         TaskAttemptID mapId = null;
@@ -1510,8 +1517,8 @@ class ReduceTask extends Task {
           // Reconnect
           try {
             connection = mapOutputLoc.getOutputLocation().openConnection();
-            input = getInputStream(connection, STALLED_COPY_TIMEOUT, 
-                                   DEFAULT_READ_TIMEOUT);
+            input = getInputStream(connection, shuffleConnectionTimeout, 
+                                   shuffleReadTimeout);
           } catch (IOException ioe) {
             LOG.info("Failed reopen connection to fetch map-output from " + 
                      mapOutputLoc.getHost());