浏览代码

HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl lock for a very long time (sinago via cmccabe)

(cherry picked from commit 28bebc81db8bb6d1bc2574de7564fe4c595cfe09)
(cherry picked from commit a827089905524e10638c783ba908a895d621911d)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

(cherry picked from commit c3a3092c37926eca75ea149c4c061742f6599b40)
Colin Patrick Mccabe 10 年之前
父节点
当前提交
c3f5ea11ec

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -112,6 +112,9 @@ Release 2.6.1 - UNRELEASED
     HDFS-7742. Favoring decommissioning node for replication can cause a block
     to stay underreplicated for long periods (Nathan Roberts via kihwal)
 
+    HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl
+    lock for a very long time (sinago via cmccabe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 44 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1180,30 +1180,51 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
-      ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
-    if (replicaInfo != null) {
-      if (replicaInfo.getGenerationStamp() < b.getGenerationStamp()
-          && replicaInfo instanceof ReplicaInPipeline) {
-        // Stop the previous writer
-        ((ReplicaInPipeline)replicaInfo)
-                      .stopWriter(datanode.getDnConf().getXceiverStopTimeout());
-        invalidate(b.getBlockPoolId(), new Block[]{replicaInfo});
-      } else {
-        throw new ReplicaAlreadyExistsException("Block " + b +
-            " already exists in state " + replicaInfo.getState() +
-            " and thus cannot be created.");
+  public ReplicaInPipeline createTemporary(
+      StorageType storageType, ExtendedBlock b) throws IOException {
+    long startTimeMs = Time.monotonicNow();
+    long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
+    ReplicaInfo lastFoundReplicaInfo = null;
+    do {
+      synchronized (this) {
+        ReplicaInfo currentReplicaInfo =
+            volumeMap.get(b.getBlockPoolId(), b.getBlockId());
+        if (currentReplicaInfo == lastFoundReplicaInfo) {
+          if (lastFoundReplicaInfo != null) {
+            invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
+          }
+          FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
+          // create a temporary file to hold block in the designated volume
+          File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+          ReplicaInPipeline newReplicaInfo =
+              new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
+                  f.getParentFile(), 0);
+          volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+          return newReplicaInfo;
+        } else {
+          if (!(currentReplicaInfo.getGenerationStamp() < b
+              .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
+            throw new ReplicaAlreadyExistsException("Block " + b
+                + " already exists in state " + currentReplicaInfo.getState()
+                + " and thus cannot be created.");
+          }
+          lastFoundReplicaInfo = currentReplicaInfo;
+        }
       }
-    }
-    
-    FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
-    // create a temporary file to hold block in the designated volume
-    File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
-    ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
-        b.getGenerationStamp(), v, f.getParentFile(), 0);
-    volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-    return newReplicaInfo;
+
+      // Hang too long, just bail out. This is not supposed to happen.
+      long writerStopMs = Time.monotonicNow() - startTimeMs;
+      if (writerStopMs > writerStopTimeoutMs) {
+        LOG.warn("Unable to stop existing writer for block " + b + " after " 
+            + writerStopMs + " miniseconds.");
+        throw new IOException("Unable to stop existing writer for block " + b
+            + " after " + writerStopMs + " miniseconds.");
+      }
+
+      // Stop the previous writer
+      ((ReplicaInPipeline) lastFoundReplicaInfo)
+          .stopWriter(writerStopTimeoutMs);
+    } while (true);
   }
 
   /**