ソースを参照

HDFS-13994. Improve DataNode BlockSender waitForMinLength. Contributed by BELUGA BEHR.

Inigo Goiri 6 年 前
コミット
8b64fbab1a

+ 2 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -30,6 +30,7 @@ import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
@@ -258,7 +259,7 @@ class BlockSender implements java.io.Closeable {
       }
       if (replica.getState() == ReplicaState.RBW) {
         final ReplicaInPipeline rbw = (ReplicaInPipeline) replica;
-        waitForMinLength(rbw, startOffset + length);
+        rbw.waitForMinLength(startOffset + length, 3, TimeUnit.SECONDS);
         chunkChecksum = rbw.getLastChecksumAndDataLen();
       }
       if (replica instanceof FinalizedReplica) {
@@ -493,30 +494,6 @@ class BlockSender implements java.io.Closeable {
     }
     return replica;
   }
-  
-  /**
-   * Wait for rbw replica to reach the length
-   * @param rbw replica that is being written to
-   * @param len minimum length to reach
-   * @throws IOException on failing to reach the len in given wait time
-   */
-  private static void waitForMinLength(ReplicaInPipeline rbw, long len)
-      throws IOException {
-    // Wait for 3 seconds for rbw replica to reach the minimum length
-    for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
-    }
-    long bytesOnDisk = rbw.getBytesOnDisk();
-    if (bytesOnDisk < len) {
-      throw new IOException(
-          String.format("Need %d bytes, but only %d bytes available", len,
-              bytesOnDisk));
-    }
-  }
 
   /**
    * Converts an IOExcpetion (not subclasses) to SocketException.

+ 48 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java

@@ -22,7 +22,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -42,7 +46,11 @@ import org.apache.hadoop.util.StringUtils;
  * The base class implements a temporary replica
  */
 public class LocalReplicaInPipeline extends LocalReplica
-                        implements ReplicaInPipeline {
+    implements ReplicaInPipeline {
+
+  private final Lock lock = new ReentrantLock();
+  private final Condition bytesOnDiskChange = lock.newCondition();
+
   private long bytesAcked;
   private long bytesOnDisk;
   private byte[] lastChecksum;
@@ -167,16 +175,47 @@ public class LocalReplicaInPipeline extends LocalReplica
     bytesReserved = 0;
   }
 
-  @Override // ReplicaInPipeline
-  public synchronized void setLastChecksumAndDataLen(long dataLength,
-      byte[] checksum) {
-    this.bytesOnDisk = dataLength;
-    this.lastChecksum = checksum;
+  @Override
+  public void setLastChecksumAndDataLen(long dataLength, byte[] checksum) {
+    lock.lock();
+    try {
+      this.bytesOnDisk = dataLength;
+      this.lastChecksum = checksum;
+      bytesOnDiskChange.signalAll();
+    } finally {
+      lock.unlock();
+    }
   }
 
-  @Override // ReplicaInPipeline
-  public synchronized ChunkChecksum getLastChecksumAndDataLen() {
-    return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
+  @Override
+  public ChunkChecksum getLastChecksumAndDataLen() {
+    lock.lock();
+    try {
+      return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void waitForMinLength(long minLength, long time, TimeUnit unit)
+      throws IOException {
+    long nanos = unit.toNanos(time);
+    lock.lock();
+    try {
+      while (bytesOnDisk < minLength) {
+        if (nanos <= 0L) {
+          throw new IOException(
+              String.format("Need %d bytes, but only %d bytes available",
+                  minLength, bytesOnDisk));
+        }
+        nanos = bytesOnDiskChange.awaitNanos(nanos);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      lock.unlock();
+    }
   }
 
   @Override // ReplicaInPipeline

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.util.DataChecksum;
@@ -104,4 +105,17 @@ public interface ReplicaInPipeline extends Replica {
    * @throws IOException the waiting is interrupted
    */
   void stopWriter(long xceiverStopTimeout) throws IOException;
+
+  /**
+   * Causes the current thread to wait until a minimum length is reached, the
+   * thread is interrupted, or the specified waiting time elapses.
+   *
+   * @param minLength The minimum length to achieve
+   * @param time the maximum time to wait
+   * @param unit the time unit of the time argument
+   * @throws IOException if the current thread is interrupted or the minimum
+   *           length is not achieved within the time allowed.
+   */
+  void waitForMinLength(long minLength, long time, TimeUnit unit)
+      throws IOException;
 }

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -33,6 +33,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -396,6 +397,23 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     @Override
     public void stopWriter(long xceiverStopTimeout) throws IOException {
     }
+
+    @Override
+    public void waitForMinLength(long minLength, long time, TimeUnit unit)
+        throws IOException {
+      final long deadLine = System.currentTimeMillis() + unit.toMillis(time);
+      do {
+        if (getBytesOnDisk() >= minLength) {
+          return;
+        }
+        try {
+          Thread.sleep(100L);
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+      } while (deadLine > System.currentTimeMillis());
+      throw new IOException("Minimum length was not achieved within timeout");
+    }
   }
 
   /**

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.extdataset;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum;
@@ -129,4 +130,9 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline {
   @Override
   public void interruptThread() {
   }
+
+  @Override
+  public void waitForMinLength(long minLength, long time, TimeUnit unit)
+      throws IOException {
+  }
 }