Parcourir la source

HADOOP-11969. ThreadLocal initialization in several classes is not thread safe (Sean Busbey via Colin P. McCabe)

(cherry picked from commit 7dba7005b79994106321b0f86bc8f4ea51a3c185)
Colin Patrick Mccabe il y a 10 ans
Parent
commit
3dec58dd78
20 fichiers modifiés avec 66 ajouts et 49 suppressions
  1. 2 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
  2. 2 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java
  3. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/BinaryRecordInput.java
  4. 9 8
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/BinaryRecordOutput.java
  5. 3 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java
  6. 1 1
      hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSMDCFilter.java
  7. 2 1
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/servlet/ServerWebApp.java
  8. 1 1
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/test/TestDirHelper.java
  9. 2 2
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/test/TestHdfsHelper.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/test/TestJettyHelper.java
  11. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/Chain.java
  12. 3 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java
  13. 4 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
  14. 1 1
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
  15. 5 3
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
  16. 5 3
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
  17. 5 3
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java
  18. 5 3
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java
  19. 5 3
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
  20. 5 3
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java

@@ -35,7 +35,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 public class MD5Hash implements WritableComparable<MD5Hash> {
   public static final int MD5_LEN = 16;
 
-  private static ThreadLocal<MessageDigest> DIGESTER_FACTORY = new ThreadLocal<MessageDigest>() {
+  private static final ThreadLocal<MessageDigest> DIGESTER_FACTORY =
+      new ThreadLocal<MessageDigest>() {
     @Override
     protected MessageDigest initialValue() {
       try {

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java

@@ -53,7 +53,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public class Text extends BinaryComparable
     implements WritableComparable<BinaryComparable> {
   
-  private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
+  private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
     new ThreadLocal<CharsetEncoder>() {
       @Override
       protected CharsetEncoder initialValue() {
@@ -63,7 +63,7 @@ public class Text extends BinaryComparable
     }
   };
   
-  private static ThreadLocal<CharsetDecoder> DECODER_FACTORY =
+  private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY =
     new ThreadLocal<CharsetDecoder>() {
     @Override
     protected CharsetDecoder initialValue() {

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/BinaryRecordInput.java

@@ -57,9 +57,10 @@ public class BinaryRecordInput implements RecordInput {
     this.in = inp;
   }
     
-  private static ThreadLocal bIn = new ThreadLocal() {
+  private static final ThreadLocal<BinaryRecordInput> B_IN =
+      new ThreadLocal<BinaryRecordInput>() {
       @Override
-      protected synchronized Object initialValue() {
+      protected BinaryRecordInput initialValue() {
         return new BinaryRecordInput();
       }
     };
@@ -70,7 +71,7 @@ public class BinaryRecordInput implements RecordInput {
    * @return binary record input corresponding to the supplied DataInput.
    */
   public static BinaryRecordInput get(DataInput inp) {
-    BinaryRecordInput bin = (BinaryRecordInput) bIn.get();
+    BinaryRecordInput bin = B_IN.get();
     bin.setDataInput(inp);
     return bin;
   }

+ 9 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/record/BinaryRecordOutput.java

@@ -44,20 +44,21 @@ public class BinaryRecordOutput implements RecordOutput {
     this.out = out;
   }
     
-  private static ThreadLocal bOut = new ThreadLocal() {
-      @Override
-      protected synchronized Object initialValue() {
-        return new BinaryRecordOutput();
-      }
-    };
-    
+  private static final ThreadLocal<BinaryRecordOutput> B_OUT =
+      new ThreadLocal<BinaryRecordOutput>() {
+    @Override
+    protected BinaryRecordOutput initialValue() {
+      return new BinaryRecordOutput();
+    }
+  };
+
   /**
    * Get a thread-local record output for the supplied DataOutput.
    * @param out data output stream
    * @return binary record output corresponding to the supplied DataOutput.
    */
   public static BinaryRecordOutput get(DataOutput out) {
-    BinaryRecordOutput bout = (BinaryRecordOutput) bOut.get();
+    BinaryRecordOutput bout = B_OUT.get();
     bout.setDataOutput(out);
     return bout;
   }

+ 3 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java

@@ -264,7 +264,7 @@ public class ReflectionUtils {
   /**
    * Allocate a buffer for each thread that tries to clone objects.
    */
-  private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
+  private static final ThreadLocal<CopyInCopyOutBuffer> CLONE_BUFFERS
       = new ThreadLocal<CopyInCopyOutBuffer>() {
       @Override
       protected synchronized CopyInCopyOutBuffer initialValue() {
@@ -289,7 +289,7 @@ public class ReflectionUtils {
   @SuppressWarnings("unchecked")
   public static <T> T copy(Configuration conf, 
                                 T src, T dst) throws IOException {
-    CopyInCopyOutBuffer buffer = cloneBuffers.get();
+    CopyInCopyOutBuffer buffer = CLONE_BUFFERS.get();
     buffer.outBuffer.reset();
     SerializationFactory factory = getFactory(conf);
     Class<T> cls = (Class<T>) src.getClass();
@@ -306,7 +306,7 @@ public class ReflectionUtils {
   @Deprecated
   public static void cloneWritableInto(Writable dst, 
                                        Writable src) throws IOException {
-    CopyInCopyOutBuffer buffer = cloneBuffers.get();
+    CopyInCopyOutBuffer buffer = CLONE_BUFFERS.get();
     buffer.outBuffer.reset();
     src.write(buffer.outBuffer);
     buffer.moveData();

+ 1 - 1
hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSMDCFilter.java

@@ -49,7 +49,7 @@ public class KMSMDCFilter implements Filter {
     }
   }
 
-  private static ThreadLocal<Data> DATA_TL = new ThreadLocal<Data>();
+  private static final ThreadLocal<Data> DATA_TL = new ThreadLocal<Data>();
 
   public static UserGroupInformation getUgi() {
     return DATA_TL.get().ugi;

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/servlet/ServerWebApp.java

@@ -46,7 +46,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
   private static final String HTTP_PORT = ".http.port";
   public static final String SSL_ENABLED = ".ssl.enabled";
 
-  private static ThreadLocal<String> HOME_DIR_TL = new ThreadLocal<String>();
+  private static final ThreadLocal<String> HOME_DIR_TL =
+      new ThreadLocal<String>();
 
   private InetSocketAddress authority;
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/test/TestDirHelper.java

@@ -89,7 +89,7 @@ public class TestDirHelper implements MethodRule {
     }
   }
 
-  private static ThreadLocal<File> TEST_DIR_TL = new InheritableThreadLocal<File>();
+  private static final ThreadLocal<File> TEST_DIR_TL = new InheritableThreadLocal<File>();
 
   @Override
   public Statement apply(final Statement statement, final FrameworkMethod frameworkMethod, final Object o) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/test/TestHdfsHelper.java

@@ -39,9 +39,9 @@ public class TestHdfsHelper extends TestDirHelper {
 
   public static final String HADOOP_MINI_HDFS = "test.hadoop.hdfs";
 
-  private static ThreadLocal<Configuration> HDFS_CONF_TL = new InheritableThreadLocal<Configuration>();
+  private static final ThreadLocal<Configuration> HDFS_CONF_TL = new InheritableThreadLocal<Configuration>();
 
-  private static ThreadLocal<Path> HDFS_TEST_DIR_TL = new InheritableThreadLocal<Path>();
+  private static final ThreadLocal<Path> HDFS_TEST_DIR_TL = new InheritableThreadLocal<Path>();
 
   @Override
   public Statement apply(Statement statement, FrameworkMethod frameworkMethod, Object o) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/test/TestJettyHelper.java

@@ -52,7 +52,7 @@ public class TestJettyHelper implements MethodRule {
     this.keyStorePassword = keyStorePassword;
   }
 
-  private static ThreadLocal<TestJettyHelper> TEST_JETTY_TL =
+  private static final ThreadLocal<TestJettyHelper> TEST_JETTY_TL =
       new InheritableThreadLocal<TestJettyHelper>();
 
   @Override

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/Chain.java

@@ -290,7 +290,7 @@ class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
   // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
   // it has to be a thread local because if not it would break if used from a
   // MultiThreadedMapRunner.
-  private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
+  private final ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
     new ThreadLocal<DataOutputBuffer>() {
       protected DataOutputBuffer initialValue() {
         return new DataOutputBuffer(1024);

+ 3 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java

@@ -32,7 +32,7 @@ class PipesPartitioner<K extends WritableComparable,
                        V extends Writable>
   implements Partitioner<K, V> {
   
-  private static ThreadLocal<Integer> cache = new ThreadLocal<Integer>();
+  private static final ThreadLocal<Integer> CACHE = new ThreadLocal<Integer>();
   private Partitioner<K, V> part = null;
   
   @SuppressWarnings("unchecked")
@@ -46,7 +46,7 @@ class PipesPartitioner<K extends WritableComparable,
    * @param newValue the next partition value
    */
   static void setNextPartition(int newValue) {
-    cache.set(newValue);
+    CACHE.set(newValue);
   }
 
   /**
@@ -58,7 +58,7 @@ class PipesPartitioner<K extends WritableComparable,
    */
   public int getPartition(K key, V value, 
                           int numPartitions) {
-    Integer result = cache.get();
+    Integer result = CACHE.get();
     if (result == null) {
       return part.getPartition(key, value, numPartitions);
     } else {

+ 4 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java

@@ -54,7 +54,8 @@ import org.apache.hadoop.util.Time;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
-  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
+  private static final ThreadLocal<Long> SHUFFLE_START =
+      new ThreadLocal<Long>() {
     protected Long initialValue() {
       return 0L;
     }
@@ -423,7 +424,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
 
       LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
                " to " + Thread.currentThread().getName());
-      shuffleStart.set(Time.monotonicNow());
+      SHUFFLE_START.set(Time.monotonicNow());
 
       return host;
   }
@@ -464,7 +465,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       }
     }
     LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
-             (Time.monotonicNow()-shuffleStart.get()) + "ms");
+             (Time.monotonicNow()-SHUFFLE_START.get()) + "ms");
   }
 
   public synchronized void resetKnownMaps() {

+ 1 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java

@@ -406,7 +406,7 @@ public class DistCpUtils {
   /**
    * String utility to convert a number-of-bytes to human readable format.
    */
-  private static ThreadLocal<DecimalFormat> FORMATTER
+  private static final ThreadLocal<DecimalFormat> FORMATTER
                         = new ThreadLocal<DecimalFormat>() {
     @Override
     protected DecimalFormat initialValue() {

+ 5 - 3
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesInput.java

@@ -41,8 +41,10 @@ public class TypedBytesInput {
     this.in = in;
   }
 
-  private static ThreadLocal tbIn = new ThreadLocal() {
-    protected synchronized Object initialValue() {
+  private static final ThreadLocal<TypedBytesInput> TB_IN =
+      new ThreadLocal<TypedBytesInput>() {
+    @Override
+    protected TypedBytesInput initialValue() {
       return new TypedBytesInput();
     }
   };
@@ -53,7 +55,7 @@ public class TypedBytesInput {
    * @return typed bytes input corresponding to the supplied {@link DataInput}.
    */
   public static TypedBytesInput get(DataInput in) {
-    TypedBytesInput bin = (TypedBytesInput) tbIn.get();
+    TypedBytesInput bin = TB_IN.get();
     bin.setDataInput(in);
     return bin;
   }

+ 5 - 3
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java

@@ -42,8 +42,10 @@ public class TypedBytesOutput {
     this.out = out;
   }
 
-  private static ThreadLocal tbOut = new ThreadLocal() {
-    protected synchronized Object initialValue() {
+  private static final ThreadLocal<TypedBytesOutput> TB_OUT =
+      new ThreadLocal<TypedBytesOutput>() {
+    @Override
+    protected TypedBytesOutput initialValue() {
       return new TypedBytesOutput();
     }
   };
@@ -56,7 +58,7 @@ public class TypedBytesOutput {
    * {@link DataOutput}.
    */
   public static TypedBytesOutput get(DataOutput out) {
-    TypedBytesOutput bout = (TypedBytesOutput) tbOut.get();
+    TypedBytesOutput bout = TB_OUT.get();
     bout.setDataOutput(out);
     return bout;
   }

+ 5 - 3
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java

@@ -38,8 +38,10 @@ public class TypedBytesRecordInput implements RecordInput {
     this.in = in;
   }
 
-  private static ThreadLocal tbIn = new ThreadLocal() {
-    protected synchronized Object initialValue() {
+  private static final ThreadLocal<TypedBytesRecordInput> TB_IN =
+      new ThreadLocal<TypedBytesRecordInput>() {
+    @Override
+    protected TypedBytesRecordInput initialValue() {
       return new TypedBytesRecordInput();
     }
   };
@@ -53,7 +55,7 @@ public class TypedBytesRecordInput implements RecordInput {
    *         {@link TypedBytesInput}.
    */
   public static TypedBytesRecordInput get(TypedBytesInput in) {
-    TypedBytesRecordInput bin = (TypedBytesRecordInput) tbIn.get();
+    TypedBytesRecordInput bin = TB_IN.get();
     bin.setTypedBytesInput(in);
     return bin;
   }

+ 5 - 3
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java

@@ -40,8 +40,10 @@ public class TypedBytesRecordOutput implements RecordOutput {
     this.out = out;
   }
 
-  private static ThreadLocal tbOut = new ThreadLocal() {
-    protected synchronized Object initialValue() {
+  private static final ThreadLocal<TypedBytesRecordOutput> TB_OUT =
+      new ThreadLocal<TypedBytesRecordOutput>() {
+    @Override
+    protected TypedBytesRecordOutput initialValue() {
       return new TypedBytesRecordOutput();
     }
   };
@@ -55,7 +57,7 @@ public class TypedBytesRecordOutput implements RecordOutput {
    *         {@link TypedBytesOutput}.
    */
   public static TypedBytesRecordOutput get(TypedBytesOutput out) {
-    TypedBytesRecordOutput bout = (TypedBytesRecordOutput) tbOut.get();
+    TypedBytesRecordOutput bout = TB_OUT.get();
     bout.setTypedBytesOutput(out);
     return bout;
   }

+ 5 - 3
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java

@@ -61,8 +61,10 @@ public class TypedBytesWritableInput implements Configurable {
     this.in = in;
   }
 
-  private static ThreadLocal tbIn = new ThreadLocal() {
-    protected synchronized Object initialValue() {
+  private static final ThreadLocal<TypedBytesWritableInput> TB_IN =
+      new ThreadLocal<TypedBytesWritableInput>() {
+    @Override
+    protected TypedBytesWritableInput initialValue() {
       return new TypedBytesWritableInput();
     }
   };
@@ -76,7 +78,7 @@ public class TypedBytesWritableInput implements Configurable {
    *         {@link TypedBytesInput}.
    */
   public static TypedBytesWritableInput get(TypedBytesInput in) {
-    TypedBytesWritableInput bin = (TypedBytesWritableInput) tbIn.get();
+    TypedBytesWritableInput bin = TB_IN.get();
     bin.setTypedBytesInput(in);
     return bin;
   }

+ 5 - 3
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java

@@ -58,8 +58,10 @@ public class TypedBytesWritableOutput {
     this.out = out;
   }
 
-  private static ThreadLocal tbOut = new ThreadLocal() {
-    protected synchronized Object initialValue() {
+  private static final ThreadLocal<TypedBytesWritableOutput> TB_OUT =
+      new ThreadLocal<TypedBytesWritableOutput>() {
+    @Override
+    protected TypedBytesWritableOutput initialValue() {
       return new TypedBytesWritableOutput();
     }
   };
@@ -73,7 +75,7 @@ public class TypedBytesWritableOutput {
    *         {@link TypedBytesOutput}.
    */
   public static TypedBytesWritableOutput get(TypedBytesOutput out) {
-    TypedBytesWritableOutput bout = (TypedBytesWritableOutput) tbOut.get();
+    TypedBytesWritableOutput bout = TB_OUT.get();
     bout.setTypedBytesOutput(out);
     return bout;
   }