Browse Source

HDFS-15875. Check whether file is being truncated before truncate (#2746)

(cherry picked from commit 6a55baeee46f61686bd7fd8b62d141399e9af4dc)
Hui Fei 4 years ago
parent
commit
01f9d73dad

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java

@@ -119,6 +119,7 @@ public class BlockRecoveryWorker {
       List<BlockRecord> syncList = new ArrayList<>(locs.length);
       List<BlockRecord> syncList = new ArrayList<>(locs.length);
       int errorCount = 0;
       int errorCount = 0;
       int candidateReplicaCnt = 0;
       int candidateReplicaCnt = 0;
+      DataNodeFaultInjector.get().delay();
 
 
       // Check generation stamps, replica size and state. Replica must satisfy
       // Check generation stamps, replica size and state. Replica must satisfy
       // the following criteria to be included in syncList for recovery:
       // the following criteria to be included in syncList for recovery:

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

@@ -122,4 +122,9 @@ public class DataNodeFaultInjector {
    * Used as a hook to inject intercept when BPOfferService hold lock.
    * Used as a hook to inject intercept when BPOfferService hold lock.
    */
    */
   public void delayWhenOfferServiceHoldLock() {}
   public void delayWhenOfferServiceHoldLock() {}
+
+  /**
+   * Just delay a while.
+   */
+  public void delay() {}
 }
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@@ -111,6 +112,10 @@ final class FSDirTruncateOp {
               + truncatedBlock.getNumBytes();
               + truncatedBlock.getNumBytes();
           if (newLength == truncateLength) {
           if (newLength == truncateLength) {
             return new TruncateResult(false, fsd.getAuditFileInfo(iip));
             return new TruncateResult(false, fsd.getAuditFileInfo(iip));
+          } else {
+            throw new AlreadyBeingCreatedException(
+                RecoverLeaseOp.TRUNCATE_FILE.getExceptionMessage(src,
+                    clientName, clientMachine, src + " is being truncated."));
           }
           }
         }
         }
       }
       }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2629,7 +2629,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     TRUNCATE_FILE,
     TRUNCATE_FILE,
     RECOVER_LEASE;
     RECOVER_LEASE;
     
     
-    private String getExceptionMessage(String src, String holder,
+    public String getExceptionMessage(String src, String holder,
         String clientMachine, String reason) {
         String clientMachine, String reason) {
       return "Failed to " + this + " " + src + " for " + holder +
       return "Failed to " + this + " " + src + " for " + holder +
           " on " + clientMachine + " because " + reason;
           " on " + clientMachine + " because " + reason;

+ 68 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java

@@ -33,6 +33,9 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadLocalRandom;
 
 
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -218,6 +221,70 @@ public class TestFileTruncate {
     fs.delete(dir, true);
     fs.delete(dir, true);
   }
   }
 
 
+  /**
+   * Test truncate twice together on a file.
+   */
+  @Test(timeout=90000)
+  public void testTruncateTwiceTogether() throws Exception {
+
+    Path dir = new Path("/testTruncateTwiceTogether");
+    fs.mkdirs(dir);
+    final Path p = new Path(dir, "file");
+    final byte[] data = new byte[100 * BLOCK_SIZE];
+    ThreadLocalRandom.current().nextBytes(data);
+    writeContents(data, data.length, p);
+
+    DataNodeFaultInjector originInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+      @Override
+      public void delay() {
+        try {
+          // Bigger than soft lease period.
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          // Ignore
+        }
+      }
+    };
+    // Delay to recovery.
+    DataNodeFaultInjector.set(injector);
+
+    // Truncate by using different client name.
+    Thread t = new Thread(() -> {
+      String hdfsCacheDisableKey = "fs.hdfs.impl.disable.cache";
+      boolean originCacheDisable =
+          conf.getBoolean(hdfsCacheDisableKey, false);
+      try {
+        conf.setBoolean(hdfsCacheDisableKey, true);
+        FileSystem fs1 = FileSystem.get(conf);
+        fs1.truncate(p, data.length-1);
+        } catch (IOException e) {
+          // ignore
+        } finally{
+          conf.setBoolean(hdfsCacheDisableKey, originCacheDisable);
+        }
+      });
+    t.start();
+    t.join();
+    NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
+        .setLeasePeriod(LOW_SOFTLIMIT, LOW_HARDLIMIT);
+
+    LambdaTestUtils.intercept(RemoteException.class,
+        "/testTruncateTwiceTogether/file is being truncated",
+        () -> fs.truncate(p, data.length - 2));
+
+    // wait for block recovery
+    checkBlockRecovery(p);
+    assertFileLength(p, data.length - 1);
+
+    DataNodeFaultInjector.set(originInjector);
+    NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
+        .setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
+            conf.getLong(DFSConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
+                DFSConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000);
+    fs.delete(dir, true);
+  }
+
   /**
   /**
    * Truncate files and then run other operations such as
    * Truncate files and then run other operations such as
    * rename, set replication, set permission, etc.
    * rename, set replication, set permission, etc.
@@ -631,7 +698,7 @@ public class TestFileTruncate {
     {
     {
       try {
       try {
         fs.truncate(p, 0);
         fs.truncate(p, 0);
-        fail("Truncate must fail since a trancate is already in pregress.");
+        fail("Truncate must fail since a truncate is already in progress.");
       } catch (IOException expected) {
       } catch (IOException expected) {
         GenericTestUtils.assertExceptionContains(
         GenericTestUtils.assertExceptionContains(
             "Failed to TRUNCATE_FILE", expected);
             "Failed to TRUNCATE_FILE", expected);