Kaynağa Gözat

HADOOP-3604. Work around a JVM synchronization problem observed while
retrieving the address of direct buffers from compression code by obtaining
a lock during this call. Contributed by Arun C Murthy.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@673254 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 yıl önce
ebeveyn
işleme
eaff8a4a3f

+ 4 - 0
CHANGES.txt

@@ -728,6 +728,10 @@ Release 0.18.0 - Unreleased
     HADOOP-3649. Fix bug in removing blocks from the corrupted block map.
     (Lohit Vijayarenu via shv)
 
+    HADOOP-3604. Work around a JVM synchronization problem observed while
+    retrieving the address of direct buffers from compression code by obtaining
+    a lock during this call. (Arun C Murthy via cdouglas)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 3 - 0
src/core/org/apache/hadoop/io/compress/lzo/LzoCompressor.java

@@ -36,6 +36,9 @@ public class LzoCompressor implements Compressor {
   private static final Log LOG = 
     LogFactory.getLog(LzoCompressor.class.getName());
 
+  // HACK - Use this as a global lock in the JNI layer
+  private static Class clazz = LzoDecompressor.class;
+  
   private int directBufferSize;
   private byte[] userBuf = null;
   private int userBufOff = 0, userBufLen = 0;

+ 3 - 0
src/core/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java

@@ -35,6 +35,9 @@ import org.apache.hadoop.util.NativeCodeLoader;
 public class LzoDecompressor implements Decompressor {
   private static final Log LOG = 
     LogFactory.getLog(LzoDecompressor.class.getName());
+
+  // HACK - Use this as a global lock in the JNI layer
+  private static Class clazz = LzoDecompressor.class;
   
   private int directBufferSize;
   private Buffer compressedDirectBuf = null;

+ 4 - 1
src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java

@@ -33,7 +33,10 @@ import org.apache.hadoop.util.NativeCodeLoader;
  */
 public class ZlibCompressor implements Compressor {
   private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
-  
+
+  // HACK - Use this as a global lock in the JNI layer
+  private static Class clazz = ZlibCompressor.class;
+
   private long stream;
   private CompressionLevel level;
   private CompressionStrategy strategy;

+ 3 - 0
src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java

@@ -34,6 +34,9 @@ import org.apache.hadoop.util.NativeCodeLoader;
 public class ZlibDecompressor implements Decompressor {
   private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
   
+  // HACK - Use this as a global lock in the JNI layer
+  private static Class clazz = ZlibDecompressor.class;
+  
   private long stream;
   private CompressionHeader header;
   private int directBufferSize;

+ 225 - 129
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
@@ -1048,145 +1049,50 @@ class ReduceTask extends Task {
        * local fs) from the remote server.
        * We use the file system so that we generate checksum files on the data.
        * @param mapOutputLoc map-output to be fetched
-       * @param localFilename the filename to write the data into
+       * @param filename the filename to write the data into
        * @param connectionTimeout number of milliseconds for connection timeout
        * @param readTimeout number of milliseconds for read timeout
        * @return the path of the file that got created
        * @throws IOException when something goes wrong
        */
       private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
-                                     Path localFilename)
+                                     Path filename)
       throws IOException, InterruptedException {
-        boolean good = false;
-        OutputStream output = null;
+        // Connect
+        URLConnection connection = 
+          mapOutputLoc.getOutputLocation().openConnection();
+        InputStream input = getInputStream(connection, DEFAULT_READ_TIMEOUT, 
+                                           STALLED_COPY_TIMEOUT);
+
+        //We will put a file in memory if it meets certain criteria:
+        //1. The size of the (decompressed) file should be less than 25% of 
+        //    the total inmem fs
+        //2. There is space available in the inmem fs
+
+        long decompressedLength = 
+          Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
+        long compressedLength = 
+          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
+
+        // Check if this map-output can be saved in-memory
+        boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 
+
+        // Shuffle
         MapOutput mapOutput = null;
-        
-        try {
-          URLConnection connection = 
-            mapOutputLoc.getOutputLocation().openConnection();
-          InputStream input = getInputStream(connection, DEFAULT_READ_TIMEOUT, 
-                                             STALLED_COPY_TIMEOUT);
-          //We will put a file in memory if it meets certain criteria:
-          //1. The size of the (decompressed) file should be less than 25% of 
-          //    the total inmem fs
-          //2. There is space available in the inmem fs
-          
-          long decompressedLength = 
-            Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
-          long compressedLength = 
-            Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-          
-          // Check if this map-output can be saved in-memory
-          boolean canFitInMemory = 
-            ramManager.canFitInMemory(decompressedLength); 
-
-          if (canFitInMemory) {
-            int requestedSize = (int)decompressedLength;
-            // Check if we have enough buffer-space to keep map-output in-memory
-            boolean createdNow = 
-              ramManager.reserve(requestedSize, input);
-
-            LOG.info("Shuffling " + requestedSize + " bytes (" + 
-                     compressedLength + " raw bytes) " + 
-                     "into RAM-FS from " + mapOutputLoc.getTaskAttemptId());
-            
-            if (!createdNow) {
-              // Reconnect
-              try {
-                connection = mapOutputLoc.getOutputLocation().openConnection();
-                input = getInputStream(connection, DEFAULT_READ_TIMEOUT, 
-                                       STALLED_COPY_TIMEOUT);
-              } catch (Throwable t) {
-                // Cleanup
-                ramManager.closeInMemoryFile(requestedSize);
-                ramManager.unreserve(requestedSize);
-                
-                IOException ioe = new IOException("Failed to re-open " +
-                		                              "connection to " + 
-                		                              mapOutputLoc.getHost());
-                ioe.initCause(t);
-                throw ioe;
-              }
-            }
-            
-            // Are map-outputs compressed?
-            if (codec != null) {
-              decompressor.reset();
-              input = codec.createInputStream(input, decompressor);
-            }
-
-            output = new DataOutputBuffer((int)decompressedLength);
-          }
-          else {
-            // Find out a suitable location for the output on local-filesystem
-            localFilename = lDirAlloc.getLocalPathForWrite(
-                localFilename.toUri().getPath(), decompressedLength, conf);
-            LOG.info("Shuffling " + decompressedLength + " bytes (" + 
-                     compressedLength + " raw bytes) " + 
-                     "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
-            output = localFileSys.create(localFilename);
-          }
-          
-          long bytesRead = 0;
-          try {  
-            try {
-              byte[] buf = new byte[64 * 1024];
+        if (shuffleInMemory) { 
+          LOG.info("Shuffling " + decompressedLength + " bytes (" + 
+              compressedLength + " raw bytes) " + 
+              "into RAM from " + mapOutputLoc.getTaskAttemptId());
 
-              int n = input.read(buf, 0, buf.length);
-              while (n > 0) {
-                bytesRead += n;
-                shuffleClientMetrics.inputBytes(n);
-                output.write(buf, 0, n);
+          mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength);
+        } else {
+          LOG.info("Shuffling " + decompressedLength + " bytes (" + 
+              compressedLength + " raw bytes) " + 
+              "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
 
-                // indicate we're making progress
-                reporter.progress();
-                n = input.read(buf, 0, buf.length);
-              }
-              
-              LOG.info("Read " + bytesRead + " bytes from map-output " +
-                       "for " + mapOutputLoc.getTaskAttemptId());
-              
-              if (canFitInMemory) {
-                byte[] shuffleData = ((DataOutputBuffer)output).getData();
-                mapOutput = new MapOutput(mapOutputLoc.getTaskId(), 
-                                          ((DataOutputBuffer)output).getData());
-                ramManager.closeInMemoryFile(shuffleData.length);
-              } else {
-              mapOutput = 
-                new MapOutput(mapOutputLoc.getTaskId(), conf, 
-                              localFileSys.makeQualified(localFilename), 
-                              compressedLength);
-              }
-            } finally {
-              output.close();
-            }
-          } finally {
-            input.close();
-          }
-          
-          // Sanity check
-          good = (canFitInMemory) ? (bytesRead == decompressedLength) : 
-                                           (bytesRead == compressedLength);
-          if (!good) {
-            throw new IOException("Incomplete map output received for " +
-                                  mapOutputLoc.getTaskAttemptId() + " from " +
-                                  mapOutputLoc.getOutputLocation() + " (" + 
-                                  bytesRead + " instead of " + 
-                                  decompressedLength + ")"
-                                 );
-          }
-        } finally {
-          if (!good) {
-            try {
-              if (mapOutput != null) {
-                mapOutput.discard();
-              }
-            } catch (Throwable th) {
-              // IGNORED because we are cleaning up
-            }
-          }
+          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength);
         }
-        
+            
         return mapOutput;
       }
 
@@ -1235,7 +1141,197 @@ class ReduceTask extends Task {
         }
       }
 
-    }
+      private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
+                                        URLConnection connection, 
+                                        InputStream input,
+                                        int mapOutputLength) 
+      throws IOException, InterruptedException {
+        // Reserve ram for the map-output
+        boolean createdNow = ramManager.reserve(mapOutputLength, input);
+      
+        // Reconnect if we need to
+        if (!createdNow) {
+          // Reconnect
+          try {
+            connection = mapOutputLoc.getOutputLocation().openConnection();
+            input = getInputStream(connection, DEFAULT_READ_TIMEOUT, 
+                STALLED_COPY_TIMEOUT);
+          } catch (IOException ioe) {
+            LOG.info("Failed reopen connection to fetch map-output from " + 
+                     mapOutputLoc.getHost());
+            
+            // Inform the ram-manager
+            ramManager.closeInMemoryFile(mapOutputLength);
+            ramManager.unreserve(mapOutputLength);
+            
+            throw ioe;
+          }
+        }
+      
+        // Are map-outputs compressed?
+        if (codec != null) {
+          decompressor.reset();
+          input = codec.createInputStream(input, decompressor);
+        }
+      
+        // Copy map-output into an in-memory buffer
+        byte[] shuffleData = new byte[mapOutputLength];
+        MapOutput mapOutput = 
+          new MapOutput(mapOutputLoc.getTaskId(), shuffleData);
+        
+        int bytesRead = 0;
+        try {
+          int n = input.read(shuffleData, 0, shuffleData.length);
+          while (n > 0) {
+            bytesRead += n;
+            shuffleClientMetrics.inputBytes(n);
+
+            // indicate we're making progress
+            reporter.progress();
+            n = input.read(shuffleData, bytesRead, 
+                           (shuffleData.length-bytesRead));
+          }
+
+          LOG.info("Read " + bytesRead + " bytes from map-output for " +
+                   mapOutputLoc.getTaskAttemptId());
+
+          input.close();
+        } catch (IOException ioe) {
+          LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
+                   ioe);
+
+          // Inform the ram-manager
+          ramManager.closeInMemoryFile(mapOutputLength);
+          ramManager.unreserve(mapOutputLength);
+          
+          // Discard the map-output
+          try {
+            mapOutput.discard();
+          } catch (IOException ignored) {
+            LOG.info("Failed to discard map-output from " + 
+                     mapOutputLoc.getTaskAttemptId(), ignored);
+          }
+          mapOutput = null;
+          
+          // Close the streams
+          IOUtils.cleanup(LOG, input);
+
+          // Re-throw
+          throw ioe;
+        }
+
+        // Close the in-memory file
+        ramManager.closeInMemoryFile(mapOutputLength);
+
+        // Sanity check
+        if (bytesRead != mapOutputLength) {
+          // Inform the ram-manager
+          ramManager.unreserve(mapOutputLength);
+          
+          // Discard the map-output
+          try {
+            mapOutput.discard();
+          } catch (IOException ignored) {
+            // IGNORED because we are cleaning up
+            LOG.info("Failed to discard map-output from " + 
+                     mapOutputLoc.getTaskAttemptId(), ignored);
+          }
+          mapOutput = null;
+
+          throw new IOException("Incomplete map output received for " +
+                                mapOutputLoc.getTaskAttemptId() + " from " +
+                                mapOutputLoc.getOutputLocation() + " (" + 
+                                bytesRead + " instead of " + 
+                                mapOutputLength + ")"
+          );
+        }
+
+        return mapOutput;
+      }
+      
+      private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
+                                      InputStream input,
+                                      Path filename,
+                                      long mapOutputLength) 
+      throws IOException {
+        // Find out a suitable location for the output on local-filesystem
+        Path localFilename = 
+          lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
+                                         mapOutputLength, conf);
+
+        MapOutput mapOutput = 
+          new MapOutput(mapOutputLoc.getTaskId(), conf, 
+                        localFileSys.makeQualified(localFilename), 
+                        mapOutputLength);
+
+
+        // Copy data to local-disk
+        OutputStream output = null;
+        long bytesRead = 0;
+        try {
+          output = localFileSys.create(localFilename);
+          
+          byte[] buf = new byte[64 * 1024];
+          int n = input.read(buf, 0, buf.length);
+          while (n > 0) {
+            bytesRead += n;
+            shuffleClientMetrics.inputBytes(n);
+            output.write(buf, 0, n);
+
+            // indicate we're making progress
+            reporter.progress();
+            n = input.read(buf, 0, buf.length);
+          }
+
+          LOG.info("Read " + bytesRead + " bytes from map-output for " +
+              mapOutputLoc.getTaskAttemptId());
+
+          output.close();
+          input.close();
+        } catch (IOException ioe) {
+          LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
+                   ioe);
+
+          // Discard the map-output
+          try {
+            mapOutput.discard();
+          } catch (IOException ignored) {
+            LOG.info("Failed to discard map-output from " + 
+                mapOutputLoc.getTaskAttemptId(), ignored);
+          }
+          mapOutput = null;
+
+          // Close the streams
+          IOUtils.cleanup(LOG, input, output);
+
+          // Re-throw
+          throw ioe;
+        }
+
+        // Sanity check
+        if (bytesRead != mapOutputLength) {
+          try {
+            mapOutput.discard();
+          } catch (Throwable th) {
+            // IGNORED because we are cleaning up
+            LOG.info("Failed to discard map-output from " + 
+                mapOutputLoc.getTaskAttemptId(), th);
+          }
+          mapOutput = null;
+
+          throw new IOException("Incomplete map output received for " +
+                                mapOutputLoc.getTaskAttemptId() + " from " +
+                                mapOutputLoc.getOutputLocation() + " (" + 
+                                bytesRead + " instead of " + 
+                                mapOutputLength + ")"
+          );
+        }
+
+        return mapOutput;
+
+      }
+      
+    } // MapOutputCopier
     
     private void configureClasspath(JobConf conf)
       throws IOException {

+ 18 - 2
src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c

@@ -118,6 +118,7 @@ typedef int
                                   lzo_bytep dst, lzo_uintp dst_len,
                                   lzo_voidp wrkmem, int compression_level );
 
+static jfieldID LzoCompressor_clazz;
 static jfieldID LzoCompressor_finish;
 static jfieldID LzoCompressor_finished;
 static jfieldID LzoCompressor_uncompressedDirectBuf;
@@ -139,6 +140,8 @@ Java_org_apache_hadoop_io_compress_lzo_LzoCompressor_initIDs(
 	  return;
 	}
     
+  LzoCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+                                                 "Ljava/lang/Class;");
   LzoCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
   LzoCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
   LzoCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class, 
@@ -215,6 +218,8 @@ Java_org_apache_hadoop_io_compress_lzo_LzoCompressor_compressBytesDirect(
   const char *lzo_compressor_function = lzo_compressors[compressor].function;
 
 	// Get members of LzoCompressor
+    jobject clazz = (*env)->GetStaticObjectField(env, this, 
+                                                 LzoCompressor_clazz);
 	jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, 
 									                    LzoCompressor_uncompressedDirectBuf);
 	lzo_uint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, 
@@ -231,20 +236,31 @@ Java_org_apache_hadoop_io_compress_lzo_LzoCompressor_compressBytesDirect(
   jlong lzo_compressor_funcptr = (*env)->GetLongField(env, this,
                   LzoCompressor_lzoCompressor);
 
-  // Get direct buffers
+    // Get the input direct buffer
+    LOCK_CLASS(env, clazz, "LzoCompressor");
 	lzo_bytep uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
                                             uncompressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "LzoCompressor");
+    
   if (uncompressed_bytes == 0) {
     	return (jint)0;
 	}
 	
+    // Get the output direct buffer
+    LOCK_CLASS(env, clazz, "LzoCompressor");
 	lzo_bytep compressed_bytes = (*env)->GetDirectBufferAddress(env, 
                                             compressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "LzoCompressor");
+    
   if (compressed_bytes == 0) {
 		return (jint)0;
 	}
 	
-  lzo_voidp workmem = (*env)->GetDirectBufferAddress(env, working_memory_buf);
+    // Get the working-memory direct buffer
+    LOCK_CLASS(env, clazz, "LzoCompressor");
+    lzo_voidp workmem = (*env)->GetDirectBufferAddress(env, working_memory_buf);
+    UNLOCK_CLASS(env, clazz, "LzoCompressor");
+    
   if (workmem == 0) {
     return (jint)0;
   }

+ 13 - 1
src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c

@@ -88,6 +88,7 @@ static char* lzo_decompressors[] = {
   /* 27 */  "lzo2a_decompress_safe"
 };
 
+static jfieldID LzoDecompressor_clazz;
 static jfieldID LzoDecompressor_finished;
 static jfieldID LzoDecompressor_compressedDirectBuf;
 static jfieldID LzoDecompressor_compressedDirectBufLen;
@@ -106,6 +107,8 @@ Java_org_apache_hadoop_io_compress_lzo_LzoDecompressor_initIDs(
 	  return;
 	}
     
+  LzoDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+                                                   "Ljava/lang/Class;");
   LzoDecompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
   LzoDecompressor_compressedDirectBuf = (*env)->GetFieldID(env, class, 
                                                 "compressedDirectBuf", 
@@ -173,6 +176,8 @@ Java_org_apache_hadoop_io_compress_lzo_LzoDecompressor_decompressBytesDirect(
   const char *lzo_decompressor_function = lzo_decompressors[decompressor];
 
 	// Get members of LzoDecompressor
+	jobject clazz = (*env)->GetStaticObjectField(env, this, 
+	                                             LzoDecompressor_clazz);
 	jobject compressed_direct_buf = (*env)->GetObjectField(env, this,
                                               LzoDecompressor_compressedDirectBuf);
 	lzo_uint compressed_direct_buf_len = (*env)->GetIntField(env, this, 
@@ -186,15 +191,22 @@ Java_org_apache_hadoop_io_compress_lzo_LzoDecompressor_decompressBytesDirect(
   jlong lzo_decompressor_funcptr = (*env)->GetLongField(env, this,
                                               LzoDecompressor_lzoDecompressor);
 
-  // Get direct buffers
+    // Get the input direct buffer
+    LOCK_CLASS(env, clazz, "LzoDecompressor");
 	lzo_bytep uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
 											                    uncompressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "LzoDecompressor");
+    
  	if (uncompressed_bytes == 0) {
     return (jint)0;
 	}
 	
+    // Get the output direct buffer
+    LOCK_CLASS(env, clazz, "LzoDecompressor");
 	lzo_bytep compressed_bytes = (*env)->GetDirectBufferAddress(env, 
 										                    compressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "LzoDecompressor");
+
   if (compressed_bytes == 0) {
 		return (jint)0;
 	}

+ 14 - 0
src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c

@@ -47,6 +47,7 @@
 #include "org_apache_hadoop_io_compress_zlib.h"
 #include "org_apache_hadoop_io_compress_zlib_ZlibCompressor.h"
 
+static jfieldID ZlibCompressor_clazz;
 static jfieldID ZlibCompressor_stream;
 static jfieldID ZlibCompressor_uncompressedDirectBuf;
 static jfieldID ZlibCompressor_uncompressedDirectBufOff;
@@ -82,6 +83,8 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_initIDs(
 	LOAD_DYNAMIC_SYMBOL(dlsym_deflateEnd, env, libz, "deflateEnd");
 
 	// Initialize the requisite fieldIds
+    ZlibCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+                                                      "Ljava/lang/Class;");
     ZlibCompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
     ZlibCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
     ZlibCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
@@ -186,6 +189,9 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect(
 		return (jint)0;
     } 
 
+    // Get members of ZlibCompressor
+    jobject clazz = (*env)->GetStaticObjectField(env, this, 
+                                                 ZlibCompressor_clazz);
 	jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, 
 									ZlibCompressor_uncompressedDirectBuf);
 	jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this, 
@@ -200,14 +206,22 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect(
 
 	jboolean finish = (*env)->GetBooleanField(env, this, ZlibCompressor_finish);
 
+    // Get the input direct buffer
+    LOCK_CLASS(env, clazz, "ZlibCompressor");
 	Bytef* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
 											uncompressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "ZlibCompressor");
+    
   	if (uncompressed_bytes == 0) {
     	return (jint)0;
 	}
 	
+    // Get the output direct buffer
+    LOCK_CLASS(env, clazz, "ZlibCompressor");
 	Bytef* compressed_bytes = (*env)->GetDirectBufferAddress(env, 
 										compressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "ZlibCompressor");
+
   	if (compressed_bytes == 0) {
 		return (jint)0;
 	}

+ 14 - 0
src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c

@@ -47,6 +47,7 @@
 #include "org_apache_hadoop_io_compress_zlib.h"
 #include "org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h"
 
+static jfieldID ZlibDecompressor_clazz;
 static jfieldID ZlibDecompressor_stream;
 static jfieldID ZlibDecompressor_compressedDirectBuf;
 static jfieldID ZlibDecompressor_compressedDirectBufOff;
@@ -82,6 +83,8 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_initIDs(
 	LOAD_DYNAMIC_SYMBOL(dlsym_inflateEnd, env, libz, "inflateEnd");
 
 	// Initialize the requisite fieldIds
+    ZlibDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", 
+                                                      "Ljava/lang/Class;");
     ZlibDecompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
     ZlibDecompressor_needDict = (*env)->GetFieldID(env, class, "needDict", "Z");
     ZlibDecompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
@@ -181,6 +184,9 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect(
 		return (jint)0;
     } 
 
+    // Get members of ZlibDecompressor
+    jobject clazz = (*env)->GetStaticObjectField(env, this, 
+                                                 ZlibDecompressor_clazz);
 	jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env, this, 
 											ZlibDecompressor_compressedDirectBuf);
 	jint compressed_direct_buf_off = (*env)->GetIntField(env, this, 
@@ -193,14 +199,22 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect(
 	jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, 
 										ZlibDecompressor_directBufferSize);
 
+    // Get the input direct buffer
+    LOCK_CLASS(env, clazz, "ZlibDecompressor");
 	Bytef *compressed_bytes = (*env)->GetDirectBufferAddress(env, 
 										compressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
+    
 	if (!compressed_bytes) {
 	    return (jint)0;
 	}
 	
+    // Get the output direct buffer
+    LOCK_CLASS(env, clazz, "ZlibDecompressor");
 	Bytef *uncompressed_bytes = (*env)->GetDirectBufferAddress(env, 
 											uncompressed_direct_buf);
+    UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
+
 	if (!uncompressed_bytes) {
 	    return (jint)0;
 	}

+ 13 - 0
src/native/src/org_apache_hadoop.h

@@ -79,6 +79,19 @@ static void *do_dlsym(JNIEnv *env, void *handle, const char *symbol) {
     return; \
   }
 
+#define LOCK_CLASS(env, clazz, classname) \
+  if ((*env)->MonitorEnter(env, clazz) != 0) { \
+    char exception_msg[128]; \
+    snprintf(exception_msg, 128, "Failed to lock %s", classname); \
+    THROW(env, "java/lang/InternalError", exception_msg); \
+  }
+
+#define UNLOCK_CLASS(env, clazz, classname) \
+  if ((*env)->MonitorExit(env, clazz) != 0) { \
+    char exception_msg[128]; \
+    snprintf(exception_msg, 128, "Failed to unlock %s", classname); \
+    THROW(env, "java/lang/InternalError", exception_msg); \
+  }
 
 #endif