Просмотр исходного кода

MAPREDUCE-6199. AbstractCounters are not reset completely on deserialization (adhoot via rkanter)

Robert Kanter 10 лет назад
Родитель
Сommit
390a7c12f5

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -288,6 +288,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-6045. need close the DataInputStream after open it in
     TestMapReduce.java (zxu via rkanter)
 
+    MAPREDUCE-6199. AbstractCounters are not reset completely on
+    deserialization (adhoot via rkanter)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java

@@ -307,6 +307,10 @@ public abstract class AbstractCounters<C extends Counter,
       fgroups.put(group.getName(), group);
     }
     int numGroups = WritableUtils.readVInt(in);
+    if (!groups.isEmpty()) {
+      groups.clear();
+      limits.reset();
+    }
     while (numGroups-- > 0) {
       limits.checkGroups(groups.size() + 1);
       G group = groupFactory.newGenericGroup(

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java

@@ -124,8 +124,15 @@ public class Limits {
     return firstViolation;
   }
 
+  // This allows initialization of global settings and not for an instance
   public static synchronized void reset(Configuration conf) {
     isInited = false;
     init(conf);
   }
+
+  // This allows resetting of an instance to allow reuse
+  public synchronized void reset() {
+    totalCounters = 0;
+    firstViolation = null;
+  }
 }

+ 38 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestCounters.java

@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.mapreduce;
 
+import java.io.IOException;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
@@ -70,7 +74,40 @@ public class TestCounters {
       testMaxGroups(new Counters());
     }
   }
-  
+
+  @Test public void testResetOnDeserialize() throws IOException {
+    // Allow only one counterGroup
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.COUNTER_GROUPS_MAX_KEY, 1);
+    Limits.init(conf);
+
+    Counters countersWithOneGroup = new Counters();
+    countersWithOneGroup.findCounter("firstOf1Allowed", "First group");
+    boolean caughtExpectedException = false;
+    try {
+      countersWithOneGroup.findCounter("secondIsTooMany", "Second group");
+    }
+    catch (LimitExceededException _) {
+      caughtExpectedException = true;
+    }
+
+    assertTrue("Did not throw expected exception",
+        caughtExpectedException);
+
+    Counters countersWithZeroGroups = new Counters();
+    DataOutputBuffer out = new DataOutputBuffer();
+    countersWithZeroGroups.write(out);
+
+    DataInputBuffer in = new DataInputBuffer();
+    in.reset(out.getData(), out.getLength());
+
+    countersWithOneGroup.readFields(in);
+
+    // After reset one should be able to add a group
+    countersWithOneGroup.findCounter("firstGroupAfterReset", "After reset " +
+        "limit should be set back to zero");
+  }
+
   @Test
   public void testCountersIncrement() {
     Counters fCounters = new Counters();