Forráskód Böngészése

MAPREDUCE-5028. Maps fail when io.sort.mb is set to high value. (kkambatl via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1458380 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 12 éve
szülő
commit
bc29eda1a5

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -106,6 +106,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5053. java.lang.InternalError from decompression codec cause
     reducer to fail (Robert Parker via jeagles)
 
+    MAPREDUCE-5028. Maps fail when io.sort.mb is set to high value. 
+    (kkambatl via tgraves)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 7 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

@@ -1122,8 +1122,9 @@ class MapTask extends Task {
       equator = pos;
       // set index prior to first entry, aligned at meta boundary
       final int aligned = pos - (pos % METASIZE);
-      kvindex =
-        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+      // Cast one of the operands to long to ensure large values don't cause int
+      // overflow
+      kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
       if (LOG.isInfoEnabled()) {
         LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
             "(" + (kvindex * 4) + ")");
@@ -1140,8 +1141,9 @@ class MapTask extends Task {
       bufstart = bufend = e;
       final int aligned = e - (e % METASIZE);
       // set start/end to point to first meta record
-      kvstart = kvend =
-        ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+      // Cast one of the operands to long to ensure large values don't cause int
+      // overflow
+      kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
       if (LOG.isInfoEnabled()) {
         LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
           (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
@@ -1705,7 +1707,7 @@ class MapTask extends Task {
           this.start = 0;
         }
 
-        super.reset(this.buffer, this.start, this.length);
+        super.reset(this.buffer, this.start, this.length - this.start);
       }
     }
 

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java

@@ -205,7 +205,8 @@ public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
           if (backupStore.hasNext()) {
             backupStore.next();
             DataInputBuffer next = backupStore.nextValue();
-            buffer.reset(next.getData(), next.getPosition(), next.getLength());
+            buffer.reset(next.getData(), next.getPosition(), next.getLength()
+                - next.getPosition());
             value = valueDeserializer.deserialize(value);
             return value;
           } else {

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java

@@ -49,14 +49,14 @@ public class InMemoryReader<K, V> extends Reader<K, V> {
 
     buffer = data;
     bufferSize = (int)fileLength;
-    memDataIn.reset(buffer, start, length);
+    memDataIn.reset(buffer, start, length - start);
     this.start = start;
     this.length = length;
   }
 
   @Override
   public void reset(int offset) {
-    memDataIn.reset(buffer, start + offset, length);
+    memDataIn.reset(buffer, start + offset, length - start - offset);
     bytesRead = offset;
     eof = false;
   }