Просмотр исходного кода

HADOOP-3863. Use a thread-local string encoder rather than a static one
that is protected by a lock. (acmurthy via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@681238 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 лет назад
Родитель
Сommit
ee3d969270
3 измененных файлов с 80 добавлено и 34 удалено
  1. 3 0
      CHANGES.txt
  2. 41 34
      src/core/org/apache/hadoop/io/Text.java
  3. 36 0
      src/test/org/apache/hadoop/io/TestText.java

+ 3 - 0
CHANGES.txt

@@ -137,6 +137,9 @@ Trunk (unreleased changes)
     datanode in the pipeline needs to verify the checksum. Saves around
     30% CPU on intermediate datanodes. (rangadi)
 
+    HADOOP-3863. Use a thread-local string encoder rather than a static one
+    that is protected by a lock. (acmurthy via omalley)
+
   BUG FIXES
 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 

+ 41 - 34
src/core/org/apache/hadoop/io/Text.java

@@ -47,15 +47,24 @@ import org.apache.commons.logging.LogFactory;
 public class Text implements WritableComparable {
   private static final Log LOG= LogFactory.getLog("org.apache.hadoop.io.Text");
   
-  private static final CharsetDecoder DECODER = 
-    Charset.forName("UTF-8").newDecoder().
-    onMalformedInput(CodingErrorAction.REPORT).
-    onUnmappableCharacter(CodingErrorAction.REPORT);
-  private static final CharsetEncoder ENCODER = 
-    Charset.forName("UTF-8").newEncoder().
-    onMalformedInput(CodingErrorAction.REPORT).
-    onUnmappableCharacter(CodingErrorAction.REPORT);
-
+  private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
+    new ThreadLocal<CharsetEncoder>() {
+      protected CharsetEncoder initialValue() {
+        return Charset.forName("UTF-8").newEncoder().
+               onMalformedInput(CodingErrorAction.REPORT).
+               onUnmappableCharacter(CodingErrorAction.REPORT);
+    }
+  };
+  
+  private static ThreadLocal<CharsetDecoder> DECODER_FACTORY =
+    new ThreadLocal<CharsetDecoder>() {
+    protected CharsetDecoder initialValue() {
+      return Charset.forName("UTF-8").newDecoder().
+             onMalformedInput(CodingErrorAction.REPORT).
+             onUnmappableCharacter(CodingErrorAction.REPORT);
+    }
+  };
+  
   private static final byte [] EMPTY_BYTES = new byte[0];
   
   private byte[] bytes;
@@ -349,21 +358,19 @@ public class Text implements WritableComparable {
   
   private static String decode(ByteBuffer utf8, boolean replace) 
     throws CharacterCodingException {
-    synchronized(DECODER) {
-      if (replace) {
-        DECODER.onMalformedInput(
-                                 java.nio.charset.CodingErrorAction.REPLACE);
-        DECODER.onUnmappableCharacter(CodingErrorAction.REPLACE);
-      }
-      String str = DECODER.decode(utf8).toString();
-      // set decoder back to its default value: REPORT
-      if (replace) {
-        DECODER.onMalformedInput(CodingErrorAction.REPORT);
-        DECODER.onUnmappableCharacter(CodingErrorAction.REPORT);
-      }
-      return str;
+    CharsetDecoder decoder = DECODER_FACTORY.get();
+    if (replace) {
+      decoder.onMalformedInput(
+          java.nio.charset.CodingErrorAction.REPLACE);
+      decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
     }
-
+    String str = decoder.decode(utf8).toString();
+    // set decoder back to its default value: REPORT
+    if (replace) {
+      decoder.onMalformedInput(CodingErrorAction.REPORT);
+      decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+    }
+    return str;
   }
 
   /**
@@ -390,18 +397,18 @@ public class Text implements WritableComparable {
    */
   public static ByteBuffer encode(String string, boolean replace)
     throws CharacterCodingException {
-    synchronized(ENCODER) {
-      if (replace) {
-        ENCODER.onMalformedInput(CodingErrorAction.REPLACE);
-        ENCODER.onUnmappableCharacter(CodingErrorAction.REPLACE);
-      }
-      ByteBuffer bytes=ENCODER.encode(CharBuffer.wrap(string.toCharArray()));
-      if (replace) {
-        ENCODER.onMalformedInput(CodingErrorAction.REPORT);
-        ENCODER.onUnmappableCharacter(CodingErrorAction.REPORT);
-      }
-      return bytes;
+    CharsetEncoder encoder = ENCODER_FACTORY.get();
+    if (replace) {
+      encoder.onMalformedInput(CodingErrorAction.REPLACE);
+      encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+    }
+    ByteBuffer bytes = 
+      encoder.encode(CharBuffer.wrap(string.toCharArray()));
+    if (replace) {
+      encoder.onMalformedInput(CodingErrorAction.REPORT);
+      encoder.onUnmappableCharacter(CodingErrorAction.REPORT);
     }
+    return bytes;
   }
 
   /** Read a UTF8 encoded string from in

+ 36 - 0
src/test/org/apache/hadoop/io/TestText.java

@@ -216,6 +216,42 @@ public class TestText extends TestCase {
     assertEquals("modified aliased string", "abc", b.toString());
     assertEquals("appended string incorrectly", "abcdefg", a.toString());
   }
+  
+  private class ConcurrentEncodeDecodeThread extends Thread {
+    public ConcurrentEncodeDecodeThread(String name) {
+      super(name);
+    }
+
+    public void run() {
+      String name = this.getName();
+      DataOutputBuffer out = new DataOutputBuffer();
+      DataInputBuffer in = new DataInputBuffer();
+      for (int i=0; i < 1000; ++i) {
+        try {
+          out.reset();
+          WritableUtils.writeString(out, name);
+          
+          in.reset(out.getData(), out.getLength());
+          String s = WritableUtils.readString(in);
+          
+          assertEquals(name, s);
+        } catch (Exception ioe) {
+          throw new RuntimeException(ioe);
+        }
+      }
+    }
+  }
+  
+  public void testConcurrentEncodeDecode() throws Exception{
+    Thread thread1 = new ConcurrentEncodeDecodeThread("apache");
+    Thread thread2 = new ConcurrentEncodeDecodeThread("hadoop");
+    
+    thread1.start();
+    thread2.start();
+    
+    thread2.join();
+    thread2.join();
+  }
 
   public static void main(String[] args)  throws Exception
   {