Procházet zdrojové kódy

HADOOP-9401. CodecPool: Add counters for number of (de)compressors leased out. (kkambatl via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1464219 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur před 12 roky
rodič
revize
da0e779e4b

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -536,6 +536,9 @@ Release 2.0.5-beta - UNRELEASED
 
     HADOOP-9358. "Auth failed" log should include exception string (todd)
 
+    HADOOP-9401. CodecPool: Add counters for number of (de)compressors 
+    leased out. (kkambatl via tucu)
+
   OPTIMIZATIONS
 
     HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

+ 65 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java

@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,10 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 /**
  * A global compressor/decompressor pool used to save and reuse 
  * (possibly native) compression/decompression codecs.
@@ -52,6 +57,29 @@ public class CodecPool {
   private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
     new HashMap<Class<Decompressor>, List<Decompressor>>();
 
+  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
+      Class<T> klass) {
+    return CacheBuilder.newBuilder().build(
+        new CacheLoader<Class<T>, AtomicInteger>() {
+          @Override
+          public AtomicInteger load(Class<T> key) throws Exception {
+            return new AtomicInteger();
+          }
+        });
+  }
+
+  /**
+   * Map to track the number of leased compressors
+   */
+  private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts =
+      createCache(Compressor.class);
+
+   /**
+   * Map to tracks the number of leased decompressors
+   */
+  private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
+      createCache(Decompressor.class);
+
   private static <T> T borrow(Map<Class<T>, List<T>> pool,
                              Class<? extends T> codecClass) {
     T codec = null;
@@ -90,6 +118,21 @@ public class CodecPool {
     }
   }
   
+  @SuppressWarnings("unchecked")
+  private static <T> int getLeaseCount(
+      LoadingCache<Class<T>, AtomicInteger> usageCounts,
+      Class<? extends T> codecClass) {
+    return usageCounts.getUnchecked((Class<T>) codecClass).get();
+  }
+
+  private static <T> void updateLeaseCount(
+      LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) {
+    if (codec != null) {
+      Class<T> codecClass = ReflectionUtils.getClass(codec);
+      usageCounts.getUnchecked(codecClass).addAndGet(delta);
+    }
+  }
+
   /**
    * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
    * pool or a new one.
@@ -111,6 +154,7 @@ public class CodecPool {
         LOG.debug("Got recycled compressor");
       }
     }
+    updateLeaseCount(compressorCounts, compressor, 1);
     return compressor;
   }
   
@@ -137,6 +181,7 @@ public class CodecPool {
         LOG.debug("Got recycled decompressor");
       }
     }
+    updateLeaseCount(decompressorCounts, decompressor, 1);
     return decompressor;
   }
   
@@ -155,6 +200,7 @@ public class CodecPool {
     }
     compressor.reset();
     payback(compressorPool, compressor);
+    updateLeaseCount(compressorCounts, compressor, -1);
   }
   
   /**
@@ -173,5 +219,24 @@ public class CodecPool {
     }
     decompressor.reset();
     payback(decompressorPool, decompressor);
+    updateLeaseCount(decompressorCounts, decompressor, -1);
+  }
+
+  /**
+   * Return the number of leased {@link Compressor}s for this
+   * {@link CompressionCodec}
+   */
+  public static int getLeasedCompressorsCount(CompressionCodec codec) {
+    return (codec == null) ? 0 : getLeaseCount(compressorCounts,
+        codec.getCompressorType());
+  }
+
+  /**
+   * Return the number of leased {@link Decompressor}s for this
+   * {@link CompressionCodec}
+   */
+  public static int getLeasedDecompressorsCount(CompressionCodec codec) {
+    return (codec == null) ? 0 : getLeaseCount(decompressorCounts,
+        codec.getDecompressorType());
   }
 }

+ 70 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCodecPool {
+  private final String LEASE_COUNT_ERR =
+      "Incorrect number of leased (de)compressors";
+  DefaultCodec codec;
+
+  @Before
+  public void setup() {
+    this.codec = new DefaultCodec();
+    this.codec.setConf(new Configuration());
+  }
+
+  @Test(timeout = 1000)
+  public void testCompressorPoolCounts() {
+    // Get two compressors and return them
+    Compressor comp1 = CodecPool.getCompressor(codec);
+    Compressor comp2 = CodecPool.getCompressor(codec);
+    assertEquals(LEASE_COUNT_ERR, 2,
+        CodecPool.getLeasedCompressorsCount(codec));
+
+    CodecPool.returnCompressor(comp2);
+    assertEquals(LEASE_COUNT_ERR, 1,
+        CodecPool.getLeasedCompressorsCount(codec));
+
+    CodecPool.returnCompressor(comp1);
+    assertEquals(LEASE_COUNT_ERR, 0,
+        CodecPool.getLeasedCompressorsCount(codec));
+  }
+
+  @Test(timeout = 1000)
+  public void testDecompressorPoolCounts() {
+    // Get two decompressors and return them
+    Decompressor decomp1 = CodecPool.getDecompressor(codec);
+    Decompressor decomp2 = CodecPool.getDecompressor(codec);
+    assertEquals(LEASE_COUNT_ERR, 2,
+        CodecPool.getLeasedDecompressorsCount(codec));
+
+    CodecPool.returnDecompressor(decomp2);
+    assertEquals(LEASE_COUNT_ERR, 1,
+        CodecPool.getLeasedDecompressorsCount(codec));
+
+    CodecPool.returnDecompressor(decomp1);
+    assertEquals(LEASE_COUNT_ERR, 0,
+        CodecPool.getLeasedDecompressorsCount(codec));
+  }
+}