|
@@ -29,18 +29,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
|
|
import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
|
|
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
|
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
|
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
|
|
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
|
|
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
|
|
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
|
|
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
|
|
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
|
|
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
|
|
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
|
|
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
|
|
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
|
|
|
import org.apache.hadoop.ipc.RetriableException;
|
|
import org.apache.hadoop.ipc.RetriableException;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.StopWatch;
|
|
import org.apache.hadoop.util.StopWatch;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.io.FileNotFoundException;
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.security.GeneralSecurityException;
|
|
import java.security.GeneralSecurityException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
@@ -117,6 +115,8 @@ public class ReencryptionHandler implements Runnable {
|
|
// be single-threaded, see class javadoc for more details.
|
|
// be single-threaded, see class javadoc for more details.
|
|
private ReencryptionBatch currentBatch;
|
|
private ReencryptionBatch currentBatch;
|
|
|
|
|
|
|
|
+ private final ReencryptionPendingInodeIdCollector traverser;
|
|
|
|
+
|
|
private final ReencryptionUpdater reencryptionUpdater;
|
|
private final ReencryptionUpdater reencryptionUpdater;
|
|
private ExecutorService updaterExecutor;
|
|
private ExecutorService updaterExecutor;
|
|
|
|
|
|
@@ -185,16 +185,6 @@ public class ReencryptionHandler implements Runnable {
|
|
reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
|
|
reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void checkPauseForTesting() throws InterruptedException {
|
|
|
|
- assert !dir.hasReadLock();
|
|
|
|
- assert !dir.getFSNamesystem().hasReadLock();
|
|
|
|
- while (shouldPauseForTesting) {
|
|
|
|
- LOG.info("Sleeping in the re-encrypt handler for unit test.");
|
|
|
|
- wait();
|
|
|
|
- LOG.info("Continuing re-encrypt handler after pausing.");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
ReencryptionHandler(final EncryptionZoneManager ezMgr,
|
|
ReencryptionHandler(final EncryptionZoneManager ezMgr,
|
|
final Configuration conf) {
|
|
final Configuration conf) {
|
|
this.ezManager = ezMgr;
|
|
this.ezManager = ezMgr;
|
|
@@ -255,6 +245,7 @@ public class ReencryptionHandler implements Runnable {
|
|
reencryptionUpdater =
|
|
reencryptionUpdater =
|
|
new ReencryptionUpdater(dir, batchService, this, conf);
|
|
new ReencryptionUpdater(dir, batchService, this, conf);
|
|
currentBatch = new ReencryptionBatch(reencryptBatchSize);
|
|
currentBatch = new ReencryptionBatch(reencryptBatchSize);
|
|
|
|
+ traverser = new ReencryptionPendingInodeIdCollector(dir, this, conf);
|
|
}
|
|
}
|
|
|
|
|
|
ReencryptionStatus getReencryptionStatus() {
|
|
ReencryptionStatus getReencryptionStatus() {
|
|
@@ -338,7 +329,7 @@ public class ReencryptionHandler implements Runnable {
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
wait(interval);
|
|
wait(interval);
|
|
}
|
|
}
|
|
- checkPauseForTesting();
|
|
|
|
|
|
+ traverser.checkPauseForTesting();
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
LOG.info("Re-encrypt handler interrupted. Exiting");
|
|
LOG.info("Re-encrypt handler interrupted. Exiting");
|
|
Thread.currentThread().interrupt();
|
|
Thread.currentThread().interrupt();
|
|
@@ -396,7 +387,7 @@ public class ReencryptionHandler implements Runnable {
|
|
final INode zoneNode;
|
|
final INode zoneNode;
|
|
final ZoneReencryptionStatus zs;
|
|
final ZoneReencryptionStatus zs;
|
|
|
|
|
|
- readLock();
|
|
|
|
|
|
+ traverser.readLock();
|
|
try {
|
|
try {
|
|
zoneNode = dir.getInode(zoneId);
|
|
zoneNode = dir.getInode(zoneId);
|
|
// start re-encrypting the zone from the beginning
|
|
// start re-encrypting the zone from the beginning
|
|
@@ -418,18 +409,19 @@ public class ReencryptionHandler implements Runnable {
|
|
zoneId);
|
|
zoneId);
|
|
if (zs.getLastCheckpointFile() == null) {
|
|
if (zs.getLastCheckpointFile() == null) {
|
|
// new re-encryption
|
|
// new re-encryption
|
|
- reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME,
|
|
|
|
- zs.getEzKeyVersionName());
|
|
|
|
|
|
+ traverser.traverseDir(zoneNode.asDirectory(), zoneId,
|
|
|
|
+ HdfsFileStatus.EMPTY_NAME,
|
|
|
|
+ new ZoneTraverseInfo(zs.getEzKeyVersionName()));
|
|
} else {
|
|
} else {
|
|
// resuming from a past re-encryption
|
|
// resuming from a past re-encryption
|
|
restoreFromLastProcessedFile(zoneId, zs);
|
|
restoreFromLastProcessedFile(zoneId, zs);
|
|
}
|
|
}
|
|
// save the last batch and mark complete
|
|
// save the last batch and mark complete
|
|
- submitCurrentBatch(zoneId);
|
|
|
|
|
|
+ traverser.submitCurrentBatch(zoneId);
|
|
LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
|
|
LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
|
|
reencryptionUpdater.markZoneSubmissionDone(zoneId);
|
|
reencryptionUpdater.markZoneSubmissionDone(zoneId);
|
|
} finally {
|
|
} finally {
|
|
- readUnlock();
|
|
|
|
|
|
+ traverser.readUnlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -478,131 +470,8 @@ public class ReencryptionHandler implements Runnable {
|
|
dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
|
|
dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
|
|
parent = lpfIIP.getLastINode().getParent();
|
|
parent = lpfIIP.getLastINode().getParent();
|
|
startAfter = lpfIIP.getLastINode().getLocalNameBytes();
|
|
startAfter = lpfIIP.getLastINode().getLocalNameBytes();
|
|
- reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Iterate through all files directly inside parent, and recurse down
|
|
|
|
- * directories. The listing is done in batch, and can optionally start after
|
|
|
|
- * a position.
|
|
|
|
- * <p>
|
|
|
|
- * Each batch is then send to the threadpool, where KMS will be contacted and
|
|
|
|
- * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
|
|
|
|
- * from the threadpool.
|
|
|
|
- * <p>
|
|
|
|
- * The iteration of the inode tree is done in a depth-first fashion. But
|
|
|
|
- * instead of holding all INodeDirectory's in memory on the fly, only the
|
|
|
|
- * path components to the current inode is held. This is to reduce memory
|
|
|
|
- * consumption.
|
|
|
|
- *
|
|
|
|
- * @param parent The inode id of parent directory
|
|
|
|
- * @param zoneId Id of the EZ inode
|
|
|
|
- * @param startAfter Full path of a file the re-encrypt should start after.
|
|
|
|
- * @throws IOException
|
|
|
|
- * @throws InterruptedException
|
|
|
|
- */
|
|
|
|
- private void reencryptDir(final INodeDirectory parent, final long zoneId,
|
|
|
|
- byte[] startAfter, final String ezKeyVerName)
|
|
|
|
- throws IOException, InterruptedException {
|
|
|
|
- List<byte[]> startAfters = new ArrayList<>();
|
|
|
|
- if (parent == null) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- INode curr = parent;
|
|
|
|
- // construct startAfters all the way up to the zone inode.
|
|
|
|
- startAfters.add(startAfter);
|
|
|
|
- while (curr.getId() != zoneId) {
|
|
|
|
- startAfters.add(0, curr.getLocalNameBytes());
|
|
|
|
- curr = curr.getParent();
|
|
|
|
- }
|
|
|
|
- curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
|
|
|
|
- while (!startAfters.isEmpty()) {
|
|
|
|
- if (curr == null) {
|
|
|
|
- // lock was reacquired, re-resolve path.
|
|
|
|
- curr = resolvePaths(zoneId, startAfters);
|
|
|
|
- }
|
|
|
|
- curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Resolve the cursor of re-encryption to an inode.
|
|
|
|
- * <p>
|
|
|
|
- * The parent of the lowest level startAfter is returned. If somewhere in the
|
|
|
|
- * middle of startAfters changed, the parent of the lowest unchanged level is
|
|
|
|
- * returned.
|
|
|
|
- *
|
|
|
|
- * @param zoneId Id of the EZ inode.
|
|
|
|
- * @param startAfters the cursor, represented by a list of path bytes.
|
|
|
|
- * @return the parent inode corresponding to the startAfters, or null if
|
|
|
|
- * the EZ node (furthest parent) is deleted.
|
|
|
|
- */
|
|
|
|
- private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
|
|
|
|
- throws IOException {
|
|
|
|
- // If the readlock was reacquired, we need to resolve the paths again
|
|
|
|
- // in case things have changed. If our cursor file/dir is changed,
|
|
|
|
- // continue from the next one.
|
|
|
|
- INode zoneNode = dir.getInode(zoneId);
|
|
|
|
- if (zoneNode == null) {
|
|
|
|
- throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
|
|
|
|
- }
|
|
|
|
- INodeDirectory parent = zoneNode.asDirectory();
|
|
|
|
- for (int i = 0; i < startAfters.size(); ++i) {
|
|
|
|
- if (i == startAfters.size() - 1) {
|
|
|
|
- // last startAfter does not need to be resolved, since search for
|
|
|
|
- // nextChild will cover that automatically.
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- INode curr =
|
|
|
|
- parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
|
|
|
|
- if (curr == null) {
|
|
|
|
- // inode at this level has changed. Update startAfters to point to
|
|
|
|
- // the next dir at the parent level (and dropping any startAfters
|
|
|
|
- // at lower levels).
|
|
|
|
- for (; i < startAfters.size(); ++i) {
|
|
|
|
- startAfters.remove(startAfters.size() - 1);
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- parent = curr.asDirectory();
|
|
|
|
- }
|
|
|
|
- return parent;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Submit the current batch to the thread pool.
|
|
|
|
- *
|
|
|
|
- * @param zoneId Id of the EZ INode
|
|
|
|
- * @throws IOException
|
|
|
|
- * @throws InterruptedException
|
|
|
|
- */
|
|
|
|
- private void submitCurrentBatch(final long zoneId)
|
|
|
|
- throws IOException, InterruptedException {
|
|
|
|
- assert dir.hasReadLock();
|
|
|
|
- if (currentBatch.isEmpty()) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- ZoneSubmissionTracker zst;
|
|
|
|
- synchronized (this) {
|
|
|
|
- zst = submissions.get(zoneId);
|
|
|
|
- if (zst == null) {
|
|
|
|
- zst = new ZoneSubmissionTracker();
|
|
|
|
- submissions.put(zoneId, zst);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- Future future = batchService
|
|
|
|
- .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
|
|
|
|
- zst.addTask(future);
|
|
|
|
- LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
|
|
|
|
- currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
|
|
|
|
- currentBatch = new ReencryptionBatch(reencryptBatchSize);
|
|
|
|
- // flip the pause flag if this is nth submission.
|
|
|
|
- // The actual pause need to happen outside of the lock.
|
|
|
|
- if (pauseAfterNthSubmission > 0) {
|
|
|
|
- if (--pauseAfterNthSubmission == 0) {
|
|
|
|
- shouldPauseForTesting = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ traverser.traverseDir(parent, zoneId, startAfter,
|
|
|
|
+ new ZoneTraverseInfo(zs.getEzKeyVersionName()));
|
|
}
|
|
}
|
|
|
|
|
|
final class ReencryptionBatch {
|
|
final class ReencryptionBatch {
|
|
@@ -710,256 +579,268 @@ public class ReencryptionHandler implements Runnable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
- * Iterates the parent directory, and add direct children files to
|
|
|
|
- * current batch. If batch size meets configured threshold, a Callable
|
|
|
|
- * is created and sent to the thread pool, which will communicate to the KMS
|
|
|
|
- * to get new edeks.
|
|
|
|
- * <p>
|
|
|
|
- * Locks could be released and reacquired when a Callable is created.
|
|
|
|
- *
|
|
|
|
- * @param zoneId Id of the EZ INode
|
|
|
|
- * @return The inode which was just processed, if lock is held in the entire
|
|
|
|
- * process. Null if lock is released.
|
|
|
|
- * @throws IOException
|
|
|
|
- * @throws InterruptedException
|
|
|
|
|
|
+ * Called when a new zone is submitted for re-encryption. This will interrupt
|
|
|
|
+ * the background thread if it's waiting for the next
|
|
|
|
+ * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
|
|
*/
|
|
*/
|
|
- private INode reencryptDirInt(final long zoneId, INode curr,
|
|
|
|
- List<byte[]> startAfters, final String ezKeyVerName)
|
|
|
|
- throws IOException, InterruptedException {
|
|
|
|
- assert dir.hasReadLock();
|
|
|
|
- assert dir.getFSNamesystem().hasReadLock();
|
|
|
|
- Preconditions.checkNotNull(curr, "Current inode can't be null");
|
|
|
|
- checkZoneReady(zoneId);
|
|
|
|
- final INodeDirectory parent =
|
|
|
|
- curr.isDirectory() ? curr.asDirectory() : curr.getParent();
|
|
|
|
- ReadOnlyList<INode> children =
|
|
|
|
- parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- final byte[] startAfter = startAfters.get(startAfters.size() - 1);
|
|
|
|
- boolean lockReleased = false;
|
|
|
|
- for (int i = INodeDirectory.nextChild(children, startAfter);
|
|
|
|
- i < children.size(); ++i) {
|
|
|
|
- final INode inode = children.get(i);
|
|
|
|
- if (!reencryptINode(inode, ezKeyVerName)) {
|
|
|
|
- // inode wasn't added for re-encryption. Recurse down if it's a dir,
|
|
|
|
- // skip otherwise.
|
|
|
|
- if (!inode.isDirectory()) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
|
|
|
|
- // nested EZ, ignore.
|
|
|
|
- LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
|
|
|
|
- inode.getFullPathName(), inode.getId());
|
|
|
|
- continue;
|
|
|
|
|
|
+ synchronized void notifyNewSubmission() {
|
|
|
|
+ LOG.debug("Notifying handler for new re-encryption command.");
|
|
|
|
+ this.notify();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public ReencryptionPendingInodeIdCollector getTraverser() {
|
|
|
|
+ return traverser;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * ReencryptionPendingInodeIdCollector which throttle based on configured
|
|
|
|
+ * throttle ratio.
|
|
|
|
+ */
|
|
|
|
+ class ReencryptionPendingInodeIdCollector extends FSTreeTraverser {
|
|
|
|
+
|
|
|
|
+ private final ReencryptionHandler reencryptionHandler;
|
|
|
|
+
|
|
|
|
+ ReencryptionPendingInodeIdCollector(FSDirectory dir,
|
|
|
|
+ ReencryptionHandler rHandler, Configuration conf) {
|
|
|
|
+ super(dir, conf);
|
|
|
|
+ this.reencryptionHandler = rHandler;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void checkPauseForTesting()
|
|
|
|
+ throws InterruptedException {
|
|
|
|
+ assert !dir.hasReadLock();
|
|
|
|
+ assert !dir.getFSNamesystem().hasReadLock();
|
|
|
|
+ while (shouldPauseForTesting) {
|
|
|
|
+ LOG.info("Sleeping in the re-encrypt handler for unit test.");
|
|
|
|
+ synchronized (reencryptionHandler) {
|
|
|
|
+ reencryptionHandler.wait(30000);
|
|
}
|
|
}
|
|
- // add 1 level to the depth-first search.
|
|
|
|
- curr = inode;
|
|
|
|
- if (!startAfters.isEmpty()) {
|
|
|
|
- startAfters.remove(startAfters.size() - 1);
|
|
|
|
- startAfters.add(curr.getLocalNameBytes());
|
|
|
|
|
|
+ LOG.info("Continuing re-encrypt handler after pausing.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Process an Inode for re-encryption. Add to current batch if it's a file,
|
|
|
|
+ * no-op otherwise.
|
|
|
|
+ *
|
|
|
|
+ * @param inode
|
|
|
|
+ * the inode
|
|
|
|
+ * @return true if inode is added to currentBatch and should be
|
|
|
|
+ * re-encrypted. false otherwise: could be inode is not a file, or
|
|
|
|
+ * inode's edek's key version is not changed.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public boolean processFileInode(INode inode, TraverseInfo traverseInfo)
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ assert dir.hasReadLock();
|
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
|
+ LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
|
|
|
|
+ }
|
|
|
|
+ if (!inode.isFile()) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
|
|
|
|
+ dir, INodesInPath.fromINode(inode));
|
|
|
|
+ if (feInfo == null) {
|
|
|
|
+ LOG.warn("File {} skipped re-encryption because it is not encrypted! "
|
|
|
|
+ + "This is very likely a bug.", inode.getId());
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ if (traverseInfo instanceof ZoneTraverseInfo
|
|
|
|
+ && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals(
|
|
|
|
+ feInfo.getEzKeyVersionName())) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("File {} skipped re-encryption because edek's key version"
|
|
|
|
+ + " name is not changed.", inode.getFullPathName());
|
|
}
|
|
}
|
|
- startAfters.add(HdfsFileStatus.EMPTY_NAME);
|
|
|
|
- return lockReleased ? null : curr;
|
|
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ currentBatch.add(inode.asFile());
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Check whether zone is ready for re-encryption. Throws IOE if it's not. 1.
|
|
|
|
+ * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not
|
|
|
|
+ * active or is in safe mode.
|
|
|
|
+ *
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * if zone does not exist / is cancelled, or if NN is not ready
|
|
|
|
+ * for write.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ protected void checkINodeReady(long zoneId) throws IOException {
|
|
|
|
+ final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(
|
|
|
|
+ zoneId);
|
|
|
|
+ if (zs == null) {
|
|
|
|
+ throw new IOException("Zone " + zoneId + " status cannot be found.");
|
|
|
|
+ }
|
|
|
|
+ if (zs.isCanceled()) {
|
|
|
|
+ throw new IOException("Re-encryption is canceled for zone " + zoneId);
|
|
}
|
|
}
|
|
- if (currentBatch.size() >= reencryptBatchSize) {
|
|
|
|
- final byte[] currentStartAfter = inode.getLocalNameBytes();
|
|
|
|
- final String parentPath = parent.getFullPathName();
|
|
|
|
- submitCurrentBatch(zoneId);
|
|
|
|
- lockReleased = true;
|
|
|
|
- readUnlock();
|
|
|
|
- try {
|
|
|
|
- throttle();
|
|
|
|
- checkPauseForTesting();
|
|
|
|
- } finally {
|
|
|
|
- readLock();
|
|
|
|
|
|
+ dir.getFSNamesystem().checkNameNodeSafeMode(
|
|
|
|
+ "NN is in safe mode, cannot re-encrypt.");
|
|
|
|
+ // re-encryption should be cancelled when NN goes to standby. Just
|
|
|
|
+ // double checking for sanity.
|
|
|
|
+ dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Submit the current batch to the thread pool.
|
|
|
|
+ *
|
|
|
|
+ * @param zoneId
|
|
|
|
+ * Id of the EZ INode
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ protected void submitCurrentBatch(final long zoneId) throws IOException,
|
|
|
|
+ InterruptedException {
|
|
|
|
+ if (currentBatch.isEmpty()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ ZoneSubmissionTracker zst;
|
|
|
|
+ synchronized (ReencryptionHandler.this) {
|
|
|
|
+ zst = submissions.get(zoneId);
|
|
|
|
+ if (zst == null) {
|
|
|
|
+ zst = new ZoneSubmissionTracker();
|
|
|
|
+ submissions.put(zoneId, zst);
|
|
}
|
|
}
|
|
- checkZoneReady(zoneId);
|
|
|
|
-
|
|
|
|
- // Things could have changed when the lock was released.
|
|
|
|
- // Re-resolve the parent inode.
|
|
|
|
- FSPermissionChecker pc = dir.getPermissionChecker();
|
|
|
|
- INode newParent =
|
|
|
|
- dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
|
|
|
|
- .getLastINode();
|
|
|
|
- if (newParent == null || !newParent.equals(parent)) {
|
|
|
|
- // parent dir is deleted or recreated. We're done.
|
|
|
|
- return null;
|
|
|
|
|
|
+ }
|
|
|
|
+ Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
|
|
|
|
+ currentBatch, reencryptionHandler));
|
|
|
|
+ zst.addTask(future);
|
|
|
|
+ LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
|
|
|
|
+ currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
|
|
|
|
+ currentBatch = new ReencryptionBatch(reencryptBatchSize);
|
|
|
|
+ // flip the pause flag if this is nth submission.
|
|
|
|
+ // The actual pause need to happen outside of the lock.
|
|
|
|
+ if (pauseAfterNthSubmission > 0) {
|
|
|
|
+ if (--pauseAfterNthSubmission == 0) {
|
|
|
|
+ shouldPauseForTesting = true;
|
|
}
|
|
}
|
|
- children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
|
|
|
|
- // -1 to counter the ++ on the for loop
|
|
|
|
- i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // Successfully finished this dir, adjust pointers to 1 level up, and
|
|
|
|
- // startAfter this dir.
|
|
|
|
- startAfters.remove(startAfters.size() - 1);
|
|
|
|
- if (!startAfters.isEmpty()) {
|
|
|
|
- startAfters.remove(startAfters.size() - 1);
|
|
|
|
- startAfters.add(curr.getLocalNameBytes());
|
|
|
|
- }
|
|
|
|
- curr = curr.getParent();
|
|
|
|
- return lockReleased ? null : curr;
|
|
|
|
- }
|
|
|
|
|
|
|
|
- private void readLock() {
|
|
|
|
- dir.getFSNamesystem().readLock();
|
|
|
|
- dir.readLock();
|
|
|
|
- throttleTimerLocked.start();
|
|
|
|
- }
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Throttles the ReencryptionHandler in 3 aspects:
|
|
|
|
+ * 1. Prevents generating more Callables than the CPU could possibly
|
|
|
|
+ * handle.
|
|
|
|
+ * 2. Prevents generating more Callables than the ReencryptionUpdater
|
|
|
|
+ * can handle, under its own throttling.
|
|
|
|
+ * 3. Prevents contending FSN/FSD read locks. This is done based
|
|
|
|
+ * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
|
|
|
|
+ * <p>
|
|
|
|
+ * Item 1 and 2 are to control NN heap usage.
|
|
|
|
+ *
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ @Override
|
|
|
|
+ protected void throttle() throws InterruptedException {
|
|
|
|
+ assert !dir.hasReadLock();
|
|
|
|
+ assert !dir.getFSNamesystem().hasReadLock();
|
|
|
|
+ final int numCores = Runtime.getRuntime().availableProcessors();
|
|
|
|
+ if (taskQueue.size() >= numCores) {
|
|
|
|
+ LOG.debug("Re-encryption handler throttling because queue size {} is"
|
|
|
|
+ + "larger than number of cores {}", taskQueue.size(), numCores);
|
|
|
|
+ while (taskQueue.size() >= numCores) {
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- private void readUnlock() {
|
|
|
|
- dir.readUnlock();
|
|
|
|
- dir.getFSNamesystem().readUnlock("reencryptHandler");
|
|
|
|
- throttleTimerLocked.stop();
|
|
|
|
- }
|
|
|
|
|
|
+ // 2. if tasks are piling up on the updater, don't create new callables
|
|
|
|
+ // until the queue size goes down.
|
|
|
|
+ final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
|
|
|
|
+ int numTasks = numTasksSubmitted();
|
|
|
|
+ if (numTasks >= maxTasksPiled) {
|
|
|
|
+ LOG.debug("Re-encryption handler throttling because total tasks pending"
|
|
|
|
+ + " re-encryption updater is {}", numTasks);
|
|
|
|
+ while (numTasks >= maxTasksPiled) {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ numTasks = numTasksSubmitted();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
|
- * Throttles the ReencryptionHandler in 3 aspects:
|
|
|
|
- * 1. Prevents generating more Callables than the CPU could possibly handle.
|
|
|
|
- * 2. Prevents generating more Callables than the ReencryptionUpdater can
|
|
|
|
- * handle, under its own throttling
|
|
|
|
- * 3. Prevents contending FSN/FSD read locks. This is done based on the
|
|
|
|
- * DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
|
|
|
|
- * <p>
|
|
|
|
- * Item 1 and 2 are to control NN heap usage.
|
|
|
|
- *
|
|
|
|
- * @throws InterruptedException
|
|
|
|
- */
|
|
|
|
- @VisibleForTesting
|
|
|
|
- void throttle() throws InterruptedException {
|
|
|
|
- // 1.
|
|
|
|
- final int numCores = Runtime.getRuntime().availableProcessors();
|
|
|
|
- if (taskQueue.size() >= numCores) {
|
|
|
|
- LOG.debug("Re-encryption handler throttling because queue size {} is"
|
|
|
|
- + "larger than number of cores {}", taskQueue.size(), numCores);
|
|
|
|
- while (taskQueue.size() >= numCores) {
|
|
|
|
- Thread.sleep(100);
|
|
|
|
|
|
+ // 3.
|
|
|
|
+ if (throttleLimitHandlerRatio >= 1.0) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
|
|
|
|
+ * throttleLimitHandlerRatio);
|
|
|
|
+ final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
|
|
|
|
+ + " throttleTimerAll:{}", expect, actual,
|
|
|
|
+ throttleTimerAll.now(TimeUnit.MILLISECONDS));
|
|
}
|
|
}
|
|
|
|
+ if (expect - actual < 0) {
|
|
|
|
+ // in case throttleLimitHandlerRatio is very small, expect will be 0.
|
|
|
|
+ // so sleepMs should not be calculated from expect, to really meet the
|
|
|
|
+ // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
|
|
|
|
+ // should be 1000 - throttleTimerAll.now()
|
|
|
|
+ final long sleepMs = (long) (actual / throttleLimitHandlerRatio)
|
|
|
|
+ - throttleTimerAll.now(TimeUnit.MILLISECONDS);
|
|
|
|
+ LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
|
|
|
|
+ Thread.sleep(sleepMs);
|
|
|
|
+ }
|
|
|
|
+ throttleTimerAll.reset().start();
|
|
|
|
+ throttleTimerLocked.reset();
|
|
}
|
|
}
|
|
|
|
|
|
- // 2. if tasks are piling up on the updater, don't create new callables
|
|
|
|
- // until the queue size goes down.
|
|
|
|
- final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
|
|
|
|
- int numTasks = numTasksSubmitted();
|
|
|
|
- if (numTasks >= maxTasksPiled) {
|
|
|
|
- LOG.debug("Re-encryption handler throttling because total tasks pending"
|
|
|
|
- + " re-encryption updater is {}", numTasks);
|
|
|
|
- while (numTasks >= maxTasksPiled) {
|
|
|
|
- Thread.sleep(500);
|
|
|
|
- numTasks = numTasksSubmitted();
|
|
|
|
|
|
+ private int numTasksSubmitted() {
|
|
|
|
+ int ret = 0;
|
|
|
|
+ synchronized (ReencryptionHandler.this) {
|
|
|
|
+ for (ZoneSubmissionTracker zst : submissions.values()) {
|
|
|
|
+ ret += zst.getTasks().size();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ return ret;
|
|
}
|
|
}
|
|
|
|
|
|
- // 3.
|
|
|
|
- if (throttleLimitHandlerRatio >= 1.0) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
|
|
|
|
- * throttleLimitHandlerRatio);
|
|
|
|
- final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
|
|
|
|
- + " throttleTimerAll:{}", expect, actual,
|
|
|
|
- throttleTimerAll.now(TimeUnit.MILLISECONDS));
|
|
|
|
- }
|
|
|
|
- if (expect - actual < 0) {
|
|
|
|
- // in case throttleLimitHandlerRatio is very small, expect will be 0.
|
|
|
|
- // so sleepMs should not be calculated from expect, to really meet the
|
|
|
|
- // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
|
|
|
|
- // should be 1000 - throttleTimerAll.now()
|
|
|
|
- final long sleepMs =
|
|
|
|
- (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
|
|
|
|
- .now(TimeUnit.MILLISECONDS);
|
|
|
|
- LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
|
|
|
|
- Thread.sleep(sleepMs);
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public boolean shouldSubmitCurrentBatch() {
|
|
|
|
+ return currentBatch.size() >= reencryptBatchSize;
|
|
}
|
|
}
|
|
- throttleTimerAll.reset().start();
|
|
|
|
- throttleTimerLocked.reset();
|
|
|
|
- }
|
|
|
|
|
|
|
|
- private synchronized int numTasksSubmitted() {
|
|
|
|
- int ret = 0;
|
|
|
|
- for (ZoneSubmissionTracker zst : submissions.values()) {
|
|
|
|
- ret += zst.getTasks().size();
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public boolean canTraverseDir(INode inode) throws IOException {
|
|
|
|
+ if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
|
|
|
|
+ // nested EZ, ignore.
|
|
|
|
+ LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
|
|
|
|
+ inode.getFullPathName(), inode.getId());
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
}
|
|
}
|
|
- return ret;
|
|
|
|
- }
|
|
|
|
|
|
|
|
- /**
|
|
|
|
- * Process an Inode for re-encryption. Add to current batch if it's a file,
|
|
|
|
- * no-op otherwise.
|
|
|
|
- *
|
|
|
|
- * @param inode the inode
|
|
|
|
- * @return true if inode is added to currentBatch and should be re-encrypted.
|
|
|
|
- * false otherwise: could be inode is not a file, or inode's edek's
|
|
|
|
- * key version is not changed.
|
|
|
|
- * @throws IOException
|
|
|
|
- * @throws InterruptedException
|
|
|
|
- */
|
|
|
|
- private boolean reencryptINode(final INode inode, final String ezKeyVerName)
|
|
|
|
- throws IOException, InterruptedException {
|
|
|
|
- assert dir.hasReadLock();
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
|
|
|
|
- }
|
|
|
|
- if (!inode.isFile()) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
|
|
|
|
- .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
|
|
|
|
- if (feInfo == null) {
|
|
|
|
- LOG.warn("File {} skipped re-encryption because it is not encrypted! "
|
|
|
|
- + "This is very likely a bug.", inode.getId());
|
|
|
|
- return false;
|
|
|
|
|
|
+ @Override
|
|
|
|
+ protected void readLock() {
|
|
|
|
+ super.readLock();
|
|
|
|
+ throttleTimerLocked.start();
|
|
}
|
|
}
|
|
- if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("File {} skipped re-encryption because edek's key version"
|
|
|
|
- + " name is not changed.", inode.getFullPathName());
|
|
|
|
- }
|
|
|
|
- return false;
|
|
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void readUnlock() {
|
|
|
|
+ super.readUnlock();
|
|
|
|
+ throttleTimerLocked.stop();
|
|
}
|
|
}
|
|
- currentBatch.add(inode.asFile());
|
|
|
|
- return true;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Check whether zone is ready for re-encryption. Throws IOE if it's not.
|
|
|
|
- * 1. If EZ is deleted.
|
|
|
|
- * 2. if the re-encryption is canceled.
|
|
|
|
- * 3. If NN is not active or is in safe mode.
|
|
|
|
- *
|
|
|
|
- * @throws IOException if zone does not exist / is cancelled, or if NN is not
|
|
|
|
- * ready for write.
|
|
|
|
- */
|
|
|
|
- void checkZoneReady(final long zoneId)
|
|
|
|
- throws RetriableException, SafeModeException, IOException {
|
|
|
|
- final ZoneReencryptionStatus zs =
|
|
|
|
- getReencryptionStatus().getZoneStatus(zoneId);
|
|
|
|
- if (zs == null) {
|
|
|
|
- throw new IOException("Zone " + zoneId + " status cannot be found.");
|
|
|
|
- }
|
|
|
|
- if (zs.isCanceled()) {
|
|
|
|
- throw new IOException("Re-encryption is canceled for zone " + zoneId);
|
|
|
|
|
|
+ private class ZoneTraverseInfo extends TraverseInfo {
|
|
|
|
+ private String ezKeyVerName;
|
|
|
|
+
|
|
|
|
+ ZoneTraverseInfo(String ezKeyVerName) {
|
|
|
|
+ this.ezKeyVerName = ezKeyVerName;
|
|
}
|
|
}
|
|
- dir.getFSNamesystem()
|
|
|
|
- .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
|
|
|
|
- // re-encryption should be cancelled when NN goes to standby. Just
|
|
|
|
- // double checking for sanity.
|
|
|
|
- dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
|
|
|
|
- }
|
|
|
|
|
|
|
|
- /**
|
|
|
|
- * Called when a new zone is submitted for re-encryption. This will interrupt
|
|
|
|
- * the background thread if it's waiting for the next
|
|
|
|
- * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
|
|
|
|
- */
|
|
|
|
- synchronized void notifyNewSubmission() {
|
|
|
|
- LOG.debug("Notifying handler for new re-encryption command.");
|
|
|
|
- this.notify();
|
|
|
|
|
|
+ public String getEzKeyVerName() {
|
|
|
|
+ return ezKeyVerName;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|