Browse Source

HDFS-7957. Truncate should verify quota before making changes. Contributed by Jing Zhao.

(cherry picked from commit d368d3647a858644b9fcd3be33d9fea2a6962f69)
(cherry picked from commit 6be52e42a9eb2069cedb12a8311bc36131a3804f)
Jing Zhao 10 năm trước cách đây
mục cha
commit
03fb9b4ce0

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -899,6 +899,8 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7930. commitBlockSynchronization() does not remove locations. (yliu)
 
+    HDFS-7957. Truncate should verify quota before making changes. (jing9)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

+ 36 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.security.AccessControlException;
@@ -677,7 +678,7 @@ public class FSDirectory implements Closeable {
    * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-   void updateCount(INodesInPath iip, int numOfINodes,
+  void updateCount(INodesInPath iip, int numOfINodes,
                     QuotaCounts counts, boolean checkQuota)
                     throws QuotaExceededException {
     assert hasWriteLock();
@@ -1050,7 +1051,7 @@ public class FSDirectory implements Closeable {
     INodeFile file = iip.getLastINode().asFile();
     BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     boolean onBlockBoundary =
-        unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
+        unprotectedTruncate(iip, newLength, collectedBlocks, mtime, null);
 
     if(! onBlockBoundary) {
       BlockInfoContiguous oldBlock = file.getLastBlock();
@@ -1073,11 +1074,11 @@ public class FSDirectory implements Closeable {
 
   boolean truncate(INodesInPath iip, long newLength,
                    BlocksMapUpdateInfo collectedBlocks,
-                   long mtime)
+                   long mtime, QuotaCounts delta)
       throws IOException {
     writeLock();
     try {
-      return unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
+      return unprotectedTruncate(iip, newLength, collectedBlocks, mtime, delta);
     } finally {
       writeUnlock();
     }
@@ -1097,22 +1098,49 @@ public class FSDirectory implements Closeable {
    */
   boolean unprotectedTruncate(INodesInPath iip, long newLength,
                               BlocksMapUpdateInfo collectedBlocks,
-                              long mtime) throws IOException {
+                              long mtime, QuotaCounts delta) throws IOException {
     assert hasWriteLock();
     INodeFile file = iip.getLastINode().asFile();
     int latestSnapshot = iip.getLatestSnapshotId();
     file.recordModification(latestSnapshot, true);
-    long oldDiskspaceNoRep = file.storagespaceConsumedNoReplication();
+
+    verifyQuotaForTruncate(iip, file, newLength, delta);
+
     long remainingLength =
         file.collectBlocksBeyondMax(newLength, collectedBlocks);
     file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
     file.setModificationTime(mtime);
-    updateCount(iip, 0, file.storagespaceConsumedNoReplication() - oldDiskspaceNoRep,
-      file.getBlockReplication(), true);
     // return whether on a block boundary
     return (remainingLength - newLength) == 0;
   }
 
+  private void verifyQuotaForTruncate(INodesInPath iip, INodeFile file,
+      long newLength, QuotaCounts delta) throws QuotaExceededException {
+    if (!getFSNamesystem().isImageLoaded() || shouldSkipQuotaChecks()) {
+      // Do not check quota if edit log is still being processed
+      return;
+    }
+    final long diff = file.computeQuotaDeltaForTruncate(newLength);
+    final short repl = file.getBlockReplication();
+    delta.addStorageSpace(diff * repl);
+    final BlockStoragePolicy policy = getBlockStoragePolicySuite()
+        .getPolicy(file.getStoragePolicyID());
+    List<StorageType> types = policy.chooseStorageTypes(repl);
+    for (StorageType t : types) {
+      if (t.supportTypeQuota()) {
+        delta.addTypeSpace(t, diff);
+      }
+    }
+    if (diff > 0) {
+      readLock();
+      try {
+        verifyQuota(iip, iip.length() - 1, delta, null);
+      } finally {
+        readUnlock();
+      }
+    }
+  }
+
   /**
    * This method is always called with writeLock of FSDirectory held.
    */

+ 15 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2050,16 +2050,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
               ", truncate size: " + newLength + ".");
     }
     // Perform INodeFile truncation.
-    boolean onBlockBoundary = dir.truncate(iip, newLength,
-        toRemoveBlocks, mtime);
+    final QuotaCounts delta = new QuotaCounts.Builder().build();
+    boolean onBlockBoundary = dir.truncate(iip, newLength, toRemoveBlocks,
+        mtime, delta);
     Block truncateBlock = null;
-    if(! onBlockBoundary) {
+    if(!onBlockBoundary) {
       // Open file for write, but don't log into edits
       long lastBlockDelta = file.computeFileSize() - newLength;
       assert lastBlockDelta > 0 : "delta is 0 only if on block bounday";
       truncateBlock = prepareFileForTruncate(iip, clientName, clientMachine,
           lastBlockDelta, null);
     }
+
+    // update the quota: use the preferred block size for UC block
+    dir.writeLock();
+    try {
+      dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
+    } finally {
+      dir.writeUnlock();
+    }
+
     getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime,
         truncateBlock);
     return onBlockBoundary;
@@ -2128,13 +2138,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           + truncatedBlockUC.getTruncateBlock().getNumBytes()
           + " block=" + truncatedBlockUC);
     }
-    if(shouldRecoverNow)
+    if (shouldRecoverNow) {
       truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
+    }
 
-    // update the quota: use the preferred block size for UC block
-    final long diff =
-        file.getPreferredBlockSize() - truncatedBlockUC.getNumBytes();
-    dir.updateSpaceConsumed(iip, 0, diff, file.getBlockReplication());
     return newBlock;
   }
 

+ 38 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -804,6 +805,43 @@ public class INodeFile extends INodeWithAdditionalFields
     return size;
   }
 
+  /**
+   * compute the quota usage change for a truncate op
+   * @param newLength the length for truncation
+   * @return the quota usage delta (not considering replication factor)
+   */
+  long computeQuotaDeltaForTruncate(final long newLength) {
+    final BlockInfoContiguous[] blocks = getBlocks();
+    if (blocks == null || blocks.length == 0) {
+      return 0;
+    }
+
+    int n = 0;
+    long size = 0;
+    for (; n < blocks.length && newLength > size; n++) {
+      size += blocks[n].getNumBytes();
+    }
+    final boolean onBoundary = size == newLength;
+
+    long truncateSize = 0;
+    for (int i = (onBoundary ? n : n - 1); i < blocks.length; i++) {
+      truncateSize += blocks[i].getNumBytes();
+    }
+
+    FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
+    if (sf != null) {
+      FileDiff diff = sf.getDiffs().getLast();
+      BlockInfoContiguous[] sblocks = diff != null ? diff.getBlocks() : null;
+      if (sblocks != null) {
+        for (int i = (onBoundary ? n : n-1); i < blocks.length
+            && i < sblocks.length && blocks[i].equals(sblocks[i]); i++) {
+          truncateSize -= blocks[i].getNumBytes();
+        }
+      }
+    }
+    return onBoundary ? -truncateSize : (getPreferredBlockSize() - truncateSize);
+  }
+
   void truncateBlocksTo(int n) {
     final BlockInfoContiguous[] newBlocks;
     if (n == 0) {

+ 41 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java

@@ -216,8 +216,7 @@ public class TestDiskspaceQuotaUpdate {
     INodeFile inode = fsdir.getINode(file.toString()).asFile();
     Assert.assertNotNull(inode);
     Assert.assertFalse("should not be UC", inode.isUnderConstruction());
-    Assert.assertNull("should not have a lease", cluster.getNamesystem()
-        .getLeaseManager().getLeaseByPath(file.toString()));
+    Assert.assertNull("should not have a lease", cluster.getNamesystem().getLeaseManager().getLeaseByPath(file.toString()));
     // make sure the quota usage is unchanged
     final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
         .getSpaceConsumed().getStorageSpace();
@@ -269,4 +268,44 @@ public class TestDiskspaceQuotaUpdate {
     dfs.recoverLease(file);
     cluster.restartNameNodes();
   }
+
+  /**
+   * Test truncate over quota does not mark file as UC or create a lease
+   */
+  @Test (timeout=60000)
+  public void testTruncateOverQuota() throws Exception {
+    final Path dir = new Path("/TestTruncateOverquota");
+    final Path file = new Path(dir, "file");
+
+    // create partial block file
+    dfs.mkdirs(dir);
+    DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
+
+    // lower quota to cause exception when appending to partial block
+    dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
+    final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
+        .asDirectory();
+    final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getStorageSpace();
+    try {
+      dfs.truncate(file, BLOCKSIZE / 2 - 1);
+      Assert.fail("truncate didn't fail");
+    } catch (RemoteException e) {
+      assertTrue(e.getClassName().contains("DSQuotaExceededException"));
+    }
+
+    // check that the file exists, isn't UC, and has no dangling lease
+    INodeFile inode = fsdir.getINode(file.toString()).asFile();
+    Assert.assertNotNull(inode);
+    Assert.assertFalse("should not be UC", inode.isUnderConstruction());
+    Assert.assertNull("should not have a lease", cluster.getNamesystem()
+        .getLeaseManager().getLeaseByPath(file.toString()));
+    // make sure the quota usage is unchanged
+    final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getStorageSpace();
+    assertEquals(spaceUsed, newSpaceUsed);
+    // make sure edits aren't corrupted
+    dfs.recoverLease(file);
+    cluster.restartNameNodes();
+  }
 }

+ 248 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTruncateQuotaUpdate.java

@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Make sure we correctly update the quota usage for truncate.
+ * We need to cover the following cases:
+ * 1. No snapshot, truncate to 0
+ * 2. No snapshot, truncate at block boundary
+ * 3. No snapshot, not on block boundary
+ * 4~6. With snapshot, all the current blocks are included in latest
+ *      snapshots, repeat 1~3
+ * 7~9. With snapshot, blocks in the latest snapshot and blocks in the current
+ *      file diverged, repeat 1~3
+ */
+public class TestTruncateQuotaUpdate {
+  private static final int BLOCKSIZE = 1024;
+  private static final short REPLICATION = 4;
+  private static final long DISKQUOTA = BLOCKSIZE * 20;
+  static final long seed = 0L;
+  private static final Path dir = new Path("/TestTruncateQuotaUpdate");
+  private static final Path file = new Path(dir, "file");
+
+  private MiniDFSCluster cluster;
+  private FSDirectory fsdir;
+  private DistributedFileSystem dfs;
+
+  @Before
+  public void setUp() throws Exception {
+    final Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
+        .build();
+    cluster.waitActive();
+
+    fsdir = cluster.getNamesystem().getFSDirectory();
+    dfs = cluster.getFileSystem();
+
+    dfs.mkdirs(dir);
+    dfs.setQuota(dir, Long.MAX_VALUE - 1, DISKQUOTA);
+    dfs.setQuotaByStorageType(dir, StorageType.DISK, DISKQUOTA);
+    dfs.setStoragePolicy(dir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testTruncateQuotaUpdate() throws Exception {
+
+  }
+
+  public interface TruncateCase {
+    public void prepare() throws Exception;
+    public void run() throws Exception;
+  }
+
+  private void testTruncate(long newLength, long expectedDiff,
+      long expectedUsage) throws Exception {
+    // before doing the real truncation, make sure the computation is correct
+    final INodesInPath iip = fsdir.getINodesInPath4Write(file.toString());
+    final INodeFile fileNode = iip.getLastINode().asFile();
+    fileNode.recordModification(iip.getLatestSnapshotId(), true);
+    final long diff = fileNode.computeQuotaDeltaForTruncate(newLength);
+    Assert.assertEquals(expectedDiff, diff);
+
+    // do the real truncation
+    dfs.truncate(file, newLength);
+    // wait for truncate to finish
+    TestFileTruncate.checkBlockRecovery(file, dfs);
+    final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
+        .asDirectory();
+    final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getStorageSpace();
+    final long diskUsed = dirNode.getDirectoryWithQuotaFeature()
+        .getSpaceConsumed().getTypeSpaces().get(StorageType.DISK);
+    Assert.assertEquals(expectedUsage, spaceUsed);
+    Assert.assertEquals(expectedUsage, diskUsed);
+  }
+
+  /**
+   * case 1~3
+   */
+  private class TruncateWithoutSnapshot implements TruncateCase {
+    @Override
+    public void prepare() throws Exception {
+      // original file size: 2.5 block
+      DFSTestUtil.createFile(dfs, file, BLOCKSIZE * 2 + BLOCKSIZE / 2,
+          REPLICATION, 0L);
+    }
+
+    @Override
+    public void run() throws Exception {
+      // case 1: first truncate to 1.5 blocks
+      long newLength = BLOCKSIZE + BLOCKSIZE / 2;
+      // we truncate 1 blocks, but not on the boundary, thus the diff should
+      // be -block + (block - 0.5 block) = -0.5 block
+      long diff = -BLOCKSIZE / 2;
+      // the new quota usage should be BLOCKSIZE * 1.5 * replication
+      long usage = (BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION;
+      testTruncate(newLength, diff, usage);
+
+      // case 2: truncate to 1 block
+      newLength = BLOCKSIZE;
+      // the diff should be -0.5 block since this is not on boundary
+      diff = -BLOCKSIZE / 2;
+      // after truncation the quota usage should be BLOCKSIZE * replication
+      usage = BLOCKSIZE * REPLICATION;
+      testTruncate(newLength, diff, usage);
+
+      // case 3: truncate to 0
+      testTruncate(0, -BLOCKSIZE, 0);
+    }
+  }
+
+  /**
+   * case 4~6
+   */
+  private class TruncateWithSnapshot implements TruncateCase {
+    @Override
+    public void prepare() throws Exception {
+      DFSTestUtil.createFile(dfs, file, BLOCKSIZE * 2 + BLOCKSIZE / 2,
+          REPLICATION, 0L);
+      SnapshotTestHelper.createSnapshot(dfs, dir, "s1");
+    }
+
+    @Override
+    public void run() throws Exception {
+      // case 4: truncate to 1.5 blocks
+      long newLength = BLOCKSIZE + BLOCKSIZE / 2;
+      // all the blocks are in snapshot. truncate need to allocate a new block
+      // diff should be +BLOCKSIZE
+      long diff = BLOCKSIZE;
+      // the new quota usage should be BLOCKSIZE * 3 * replication
+      long usage = BLOCKSIZE * 3 * REPLICATION;
+      testTruncate(newLength, diff, usage);
+
+      // case 5: truncate to 1 block
+      newLength = BLOCKSIZE;
+      // the block for truncation is not in snapshot, diff should be -0.5 block
+      diff = -BLOCKSIZE / 2;
+      // after truncation the quota usage should be 2.5 block * repl
+      usage = (BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION;
+      testTruncate(newLength, diff, usage);
+
+      // case 6: truncate to 0
+      testTruncate(0, 0, usage);
+    }
+  }
+
+  /**
+   * case 7~9
+   */
+  private class TruncateWithSnapshot2 implements TruncateCase {
+    @Override
+    public void prepare() throws Exception {
+      // original size: 2.5 blocks
+      DFSTestUtil.createFile(dfs, file, BLOCKSIZE * 2 + BLOCKSIZE / 2,
+          REPLICATION, 0L);
+      SnapshotTestHelper.createSnapshot(dfs, dir, "s1");
+
+      // truncate to 1.5 block
+      dfs.truncate(file, BLOCKSIZE + BLOCKSIZE / 2);
+      TestFileTruncate.checkBlockRecovery(file, dfs);
+
+      // append another 1 BLOCK
+      DFSTestUtil.appendFile(dfs, file, BLOCKSIZE);
+    }
+
+    @Override
+    public void run() throws Exception {
+      // case 8: truncate to 2 blocks
+      long newLength = BLOCKSIZE * 2;
+      // the original 2.5 blocks are in snapshot. the block truncated is not
+      // in snapshot. diff should be -0.5 block
+      long diff = -BLOCKSIZE / 2;
+      // the new quota usage should be BLOCKSIZE * 3.5 * replication
+      long usage = (BLOCKSIZE * 3 + BLOCKSIZE / 2) * REPLICATION;
+      testTruncate(newLength, diff, usage);
+
+      // case 7: truncate to 1.5 block
+      newLength = BLOCKSIZE  + BLOCKSIZE / 2;
+      // the block for truncation is not in snapshot, diff should be
+      // -0.5 block + (block - 0.5block) = 0
+      diff = 0;
+      // after truncation the quota usage should be 3 block * repl
+      usage = (BLOCKSIZE * 3) * REPLICATION;
+      testTruncate(newLength, diff, usage);
+
+      // case 9: truncate to 0
+      testTruncate(0, -BLOCKSIZE / 2,
+          (BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION);
+    }
+  }
+
+  private void testTruncateQuotaUpdate(TruncateCase t) throws Exception {
+    t.prepare();
+    t.run();
+  }
+
+  @Test
+  public void testQuotaNoSnapshot() throws Exception {
+    testTruncateQuotaUpdate(new TruncateWithoutSnapshot());
+  }
+
+  @Test
+  public void testQuotaWithSnapshot() throws Exception {
+    testTruncateQuotaUpdate(new TruncateWithSnapshot());
+  }
+
+  @Test
+  public void testQuotaWithSnapshot2() throws Exception {
+    testTruncateQuotaUpdate(new TruncateWithSnapshot2());
+  }
+}