瀏覽代碼

HDFS-12518. Re-encryption should handle task cancellation and progress better.

(cherry picked from commit 0f5287bb2fe71b35f70f977a06c31094b352512c)
Xiao Chen 7 年之前
父節點
當前提交
a7e34be69a

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java

@@ -380,10 +380,16 @@ final class FSDirEncryptionZoneOp {
   static void saveFileXAttrsForBatch(FSDirectory fsd,
       List<FileEdekInfo> batch) {
     assert fsd.getFSNamesystem().hasWriteLock();
+    assert !fsd.hasWriteLock();
     if (batch != null && !batch.isEmpty()) {
       for (FileEdekInfo entry : batch) {
         final INode inode = fsd.getInode(entry.getInodeId());
-        Preconditions.checkNotNull(inode);
+        // no dir lock, so inode could be removed. no-op if so.
+        if (inode == null) {
+          NameNode.LOG.info("Cannot find inode {}, skip saving xattr for"
+              + " re-encryption", entry.getInodeId());
+          continue;
+        }
         fsd.getEditLog().logSetXAttrs(inode.getFullPathName(),
             inode.getXAttrFeature().getXAttrs(), false);
       }

+ 66 - 40
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java

@@ -45,11 +45,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -112,8 +112,7 @@ public class ReencryptionHandler implements Runnable {
   private ExecutorCompletionService<ReencryptionTask> batchService;
   private BlockingQueue<Runnable> taskQueue;
   // protected by ReencryptionHandler object lock
-  private final Map<Long, ZoneSubmissionTracker> submissions =
-      new ConcurrentHashMap<>();
+  private final Map<Long, ZoneSubmissionTracker> submissions = new HashMap<>();
 
   // The current batch that the handler is working on. Handler is designed to
   // be single-threaded, see class javadoc for more details.
@@ -132,8 +131,10 @@ public class ReencryptionHandler implements Runnable {
    */
   void stopThreads() {
     assert dir.hasWriteLock();
-    for (ZoneSubmissionTracker zst : submissions.values()) {
-      zst.cancelAllTasks();
+    synchronized (this) {
+      for (ZoneSubmissionTracker zst : submissions.values()) {
+        zst.cancelAllTasks();
+      }
     }
     if (updaterExecutor != null) {
       updaterExecutor.shutdownNow();
@@ -269,33 +270,34 @@ public class ReencryptionHandler implements Runnable {
       throw new IOException("Zone " + zoneName + " is not under re-encryption");
     }
     zs.cancel();
-    ZoneSubmissionTracker zst = submissions.get(zoneId);
-    if (zst != null) {
-      zst.cancelAllTasks();
-    }
+    removeZoneTrackerStopTasks(zoneId);
   }
 
   void removeZone(final long zoneId) {
     assert dir.hasWriteLock();
     LOG.info("Removing zone {} from re-encryption.", zoneId);
-    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    removeZoneTrackerStopTasks(zoneId);
+    getReencryptionStatus().removeZone(zoneId);
+  }
+
+  synchronized private void removeZoneTrackerStopTasks(final long zoneId) {
+    final ZoneSubmissionTracker zst = submissions.get(zoneId);
     if (zst != null) {
       zst.cancelAllTasks();
+      submissions.remove(zoneId);
     }
-    submissions.remove(zoneId);
-    getReencryptionStatus().removeZone(zoneId);
   }
 
   ZoneSubmissionTracker getTracker(final long zoneId) {
-    dir.hasReadLock();
+    assert dir.hasReadLock();
     return unprotectedGetTracker(zoneId);
   }
 
   /**
-   * get the tracker without holding the FSDirectory lock. This is only used for
-   * testing, when updater checks about pausing.
+   * Get the tracker without holding the FSDirectory lock.
+   * The submissions object is protected by object lock.
    */
-  ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
+  synchronized ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
     return submissions.get(zoneId);
   }
 
@@ -308,16 +310,19 @@ public class ReencryptionHandler implements Runnable {
    *
    * @param zoneId
    */
-  void addDummyTracker(final long zoneId) {
+  void addDummyTracker(final long zoneId, ZoneSubmissionTracker zst) {
     assert dir.hasReadLock();
-    assert !submissions.containsKey(zoneId);
-    final ZoneSubmissionTracker zst = new ZoneSubmissionTracker();
+    if (zst == null) {
+      zst = new ZoneSubmissionTracker();
+    }
     zst.setSubmissionDone();
 
-    Future future = batchService.submit(
+    final Future future = batchService.submit(
         new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
     zst.addTask(future);
-    submissions.put(zoneId, zst);
+    synchronized (this) {
+      submissions.put(zoneId, zst);
+    }
   }
 
   /**
@@ -351,6 +356,8 @@ public class ReencryptionHandler implements Runnable {
         }
         LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}",
             zoneId, getReencryptionStatus());
+        getReencryptionStatus().markZoneStarted(zoneId);
+        resetSubmissionTracker(zoneId);
       } finally {
         dir.readUnlock();
       }
@@ -392,7 +399,6 @@ public class ReencryptionHandler implements Runnable {
 
     readLock();
     try {
-      getReencryptionStatus().markZoneStarted(zoneId);
       zoneNode = dir.getInode(zoneId);
       // start re-encrypting the zone from the beginning
       if (zoneNode == null) {
@@ -428,6 +434,20 @@ public class ReencryptionHandler implements Runnable {
     }
   }
 
+  /**
+   * Reset the zone submission tracker for re-encryption.
+   * @param zoneId
+   */
+  synchronized private void resetSubmissionTracker(final long zoneId) {
+    ZoneSubmissionTracker zst = submissions.get(zoneId);
+    if (zst == null) {
+      zst = new ZoneSubmissionTracker();
+      submissions.put(zoneId, zst);
+    } else {
+      zst.reset();
+    }
+  }
+
   List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
     assert dir.hasWriteLock();
     assert dir.getFSNamesystem().hasWriteLock();
@@ -437,8 +457,9 @@ public class ReencryptionHandler implements Runnable {
     LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files,"
             + " failures encountered: {}.", zoneNode.getFullPathName(),
         zs.getFilesReencrypted(), zs.getNumReencryptionFailures());
-    // This also removes the zone from reencryptionStatus
-    submissions.remove(zoneId);
+    synchronized (this) {
+      submissions.remove(zoneId);
+    }
     return FSDirEncryptionZoneOp
         .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs);
   }
@@ -562,10 +583,13 @@ public class ReencryptionHandler implements Runnable {
     if (currentBatch.isEmpty()) {
       return;
     }
-    ZoneSubmissionTracker zst = submissions.get(zoneId);
-    if (zst == null) {
-      zst = new ZoneSubmissionTracker();
-      submissions.put(zoneId, zst);
+    ZoneSubmissionTracker zst;
+    synchronized (this) {
+      zst = submissions.get(zoneId);
+      if (zst == null) {
+        zst = new ZoneSubmissionTracker();
+        submissions.put(zoneId, zst);
+      }
     }
     Future future = batchService
         .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
@@ -821,19 +845,13 @@ public class ReencryptionHandler implements Runnable {
     // 2. if tasks are piling up on the updater, don't create new callables
     // until the queue size goes down.
     final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
-    int totalTasks = 0;
-    for (ZoneSubmissionTracker zst : submissions.values()) {
-      totalTasks += zst.getTasks().size();
-    }
-    if (totalTasks >= maxTasksPiled) {
+    int numTasks = numTasksSubmitted();
+    if (numTasks >= maxTasksPiled) {
       LOG.debug("Re-encryption handler throttling because total tasks pending"
-          + " re-encryption updater is {}", totalTasks);
-      while (totalTasks >= maxTasksPiled) {
+          + " re-encryption updater is {}", numTasks);
+      while (numTasks >= maxTasksPiled) {
         Thread.sleep(500);
-        totalTasks = 0;
-        for (ZoneSubmissionTracker zst : submissions.values()) {
-          totalTasks += zst.getTasks().size();
-        }
+        numTasks = numTasksSubmitted();
       }
     }
 
@@ -864,6 +882,14 @@ public class ReencryptionHandler implements Runnable {
     throttleTimerLocked.reset();
   }
 
+  private synchronized int numTasksSubmitted() {
+    int ret = 0;
+    for (ZoneSubmissionTracker zst : submissions.values()) {
+      ret += zst.getTasks().size();
+    }
+    return ret;
+  }
+
   /**
    * Process an Inode for re-encryption. Add to current batch if it's a file,
    * no-op otherwise.
@@ -877,7 +903,7 @@ public class ReencryptionHandler implements Runnable {
    */
   private boolean reencryptINode(final INode inode, final String ezKeyVerName)
       throws IOException, InterruptedException {
-    dir.hasReadLock();
+    assert dir.hasReadLock();
     if (LOG.isTraceEnabled()) {
       LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
     }

+ 22 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java

@@ -93,6 +93,13 @@ public final class ReencryptionUpdater implements Runnable {
       numFutureDone = 0;
     }
 
+    void reset() {
+      submissionDone = false;
+      tasks.clear();
+      numCheckpointed = 0;
+      numFutureDone = 0;
+    }
+
     LinkedList<Future> getTasks() {
       return tasks;
     }
@@ -238,12 +245,12 @@ public final class ReencryptionUpdater implements Runnable {
   void markZoneSubmissionDone(final long zoneId)
       throws IOException, InterruptedException {
     final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
-    if (tracker != null) {
+    if (tracker != null && !tracker.getTasks().isEmpty()) {
       tracker.submissionDone = true;
     } else {
       // Caller thinks submission is done, but no tasks submitted - meaning
       // no files in the EZ need to be re-encrypted. Complete directly.
-      handler.addDummyTracker(zoneId);
+      handler.addDummyTracker(zoneId, tracker);
     }
   }
 
@@ -289,6 +296,7 @@ public final class ReencryptionUpdater implements Runnable {
       LOG.debug(
           "Updating file xattrs for re-encrypting zone {}," + " starting at {}",
           zoneNodePath, task.batch.getFirstFilePath());
+      final int batchSize = task.batch.size();
       for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
            it.hasNext();) {
         FileEdekInfo entry = it.next();
@@ -342,7 +350,7 @@ public final class ReencryptionUpdater implements Runnable {
       }
 
       LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
-              + " starting:{}.", task.numFilesUpdated, task.batch.size(),
+              + " starting:{}.", task.numFilesUpdated, batchSize,
           zoneNodePath, task.batch.getFirstFilePath());
     }
     task.processed = true;
@@ -377,6 +385,9 @@ public final class ReencryptionUpdater implements Runnable {
     ListIterator<Future> iter = tasks.listIterator();
     while (iter.hasNext()) {
       Future<ReencryptionTask> curr = iter.next();
+      if (curr.isCancelled()) {
+        break;
+      }
       if (!curr.isDone() || !curr.get().processed) {
         // still has earlier tasks not completed, skip here.
         break;
@@ -411,12 +422,12 @@ public final class ReencryptionUpdater implements Runnable {
     final Future<ReencryptionTask> completed = batchService.take();
     throttle();
     checkPauseForTesting();
-    ReencryptionTask task = completed.get();
     if (completed.isCancelled()) {
-      LOG.debug("Skipped canceled re-encryption task for zone {}, last: {}",
-          task.zoneId, task.lastFile);
+      // Ignore canceled zones. The cancellation is edit-logged by the handler.
+      LOG.debug("Skipped a canceled re-encryption task");
       return;
     }
+    final ReencryptionTask task = completed.get();
 
     boolean shouldRetry;
     do {
@@ -465,7 +476,11 @@ public final class ReencryptionUpdater implements Runnable {
           task.batch.size(), task.batch.getFirstFilePath());
       final ZoneSubmissionTracker tracker =
           handler.getTracker(zoneNode.getId());
-      Preconditions.checkNotNull(tracker, "zone tracker not found " + zonePath);
+      if (tracker == null) {
+        // re-encryption canceled.
+        LOG.info("Re-encryption was canceled.");
+        return;
+      }
       tracker.numFutureDone++;
       EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
       processTaskEntries(zonePath, task);

+ 55 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java

@@ -1467,8 +1467,8 @@ public class TestReencryption {
     assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
   }
 
-  @Test
-  public void testCancelFuture() throws Exception {
+  private void cancelFutureDuringReencryption(final Path zone)
+      throws Exception {
     final AtomicBoolean callableRunning = new AtomicBoolean(false);
     class MyInjector extends EncryptionFaultInjector {
       private volatile int exceptionCount = 0;
@@ -1498,8 +1498,6 @@ public class TestReencryption {
      * /dir/f
      */
     final int len = 8196;
-    final Path zoneParent = new Path("/zones");
-    final Path zone = new Path(zoneParent, "zone");
     fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
     dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
     for (int i = 0; i < 10; ++i) {
@@ -1548,6 +1546,59 @@ public class TestReencryption {
     assertTrue(getUpdater().isRunning());
   }
 
+  @Test
+  public void testCancelFutureThenReencrypt() throws Exception {
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    cancelFutureDuringReencryption(zone);
+
+    // make sure new re-encryption after cancellation works.
+    getEzManager().resumeReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForZoneCompletes(zone.toString());
+    final RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    final ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertFalse(zs.isCanceled());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(10, zs.getFilesReencrypted());
+  }
+
+  @Test
+  public void testCancelFutureThenRestart() throws Exception {
+    final Path zoneParent = new Path("/zones");
+    final Path zone = new Path(zoneParent, "zone");
+    cancelFutureDuringReencryption(zone);
+
+    // restart, and check status.
+    restartClusterDisableReencrypt();
+    RemoteIterator<ZoneReencryptionStatus> it =
+        dfsAdmin.listReencryptionStatus();
+    ZoneReencryptionStatus zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertTrue(zs.isCanceled());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(0, zs.getFilesReencrypted());
+
+    // verify re-encryption works after restart.
+    getEzManager().resumeReencryptForTesting();
+    dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
+    waitForZoneCompletes(zone.toString());
+    it = dfsAdmin.listReencryptionStatus();
+    zs = it.next();
+    assertEquals(zone.toString(), zs.getZoneName());
+    assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
+    assertFalse(zs.isCanceled());
+    assertTrue(zs.getCompletionTime() > 0);
+    assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
+    assertEquals(10, zs.getFilesReencrypted());
+  }
+
   @Test
   public void testReencryptCancelForUpdater() throws Exception {
     /* Setup test dir: