|
@@ -30,6 +30,7 @@ import java.util.Set;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
|
|
@@ -1086,12 +1087,9 @@ public class TestReencryption {
|
|
|
getEzManager().resumeReencryptForTesting();
|
|
|
|
|
|
Thread.sleep(3000);
|
|
|
- EncryptionZoneManager ezm = getEzManager();
|
|
|
- ReencryptionHandler handler = (ReencryptionHandler) Whitebox
|
|
|
- .getInternalState(ezm, "reencryptionHandler");
|
|
|
Map<Long, ZoneSubmissionTracker> tasks =
|
|
|
(Map<Long, ZoneSubmissionTracker>) Whitebox
|
|
|
- .getInternalState(handler, "submissions");
|
|
|
+ .getInternalState(getHandler(), "submissions");
|
|
|
List<Future> futures = new LinkedList<>();
|
|
|
for (ZoneSubmissionTracker zst : tasks.values()) {
|
|
|
for (Future f : zst.getTasks()) {
|
|
@@ -1493,6 +1491,88 @@ public class TestReencryption {
|
|
|
assertEquals(5, getZoneStatus(zone.toString()).getFilesReencrypted());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testCancelFuture() throws Exception {
|
|
|
+ final AtomicBoolean callableRunning = new AtomicBoolean(false);
|
|
|
+ class MyInjector extends EncryptionFaultInjector {
|
|
|
+ private volatile int exceptionCount = 0;
|
|
|
+
|
|
|
+ MyInjector(int numFailures) {
|
|
|
+ exceptionCount = numFailures;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void reencryptEncryptedKeys() throws IOException {
|
|
|
+ if (exceptionCount > 0) {
|
|
|
+ exceptionCount--;
|
|
|
+ try {
|
|
|
+ callableRunning.set(true);
|
|
|
+ Thread.sleep(Long.MAX_VALUE);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.info("Fault injector interrupted", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ final MyInjector injector = new MyInjector(1);
|
|
|
+ EncryptionFaultInjector.instance = injector;
|
|
|
+
|
|
|
+ /* Setup test dir:
|
|
|
+ * /zones/zone/[0-9]
|
|
|
+ * /dir/f
|
|
|
+ */
|
|
|
+ final int len = 8196;
|
|
|
+ final Path zoneParent = new Path("/zones");
|
|
|
+ final Path zone = new Path(zoneParent, "zone");
|
|
|
+ fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
|
|
|
+ dfsAdmin.createEncryptionZone(zone, TEST_KEY, NO_TRASH);
|
|
|
+ for (int i = 0; i < 10; ++i) {
|
|
|
+ DFSTestUtil
|
|
|
+ .createFile(fs, new Path(zone, Integer.toString(i)), len, (short) 1,
|
|
|
+ 0xFEED);
|
|
|
+ }
|
|
|
+ final Path subdir = new Path("/dir");
|
|
|
+ fsWrapper.mkdir(subdir, FsPermission.getDirDefault(), true);
|
|
|
+ DFSTestUtil.createFile(fs, new Path(subdir, "f"), len, (short) 1, 0xFEED);
|
|
|
+
|
|
|
+ // re-encrypt 10 files, so 2 callables. Hang 1, pause the updater so the
|
|
|
+ // callable is taken from the executor but not processed.
|
|
|
+ fsn.getProvider().rollNewVersion(TEST_KEY);
|
|
|
+ fsn.getProvider().flush();
|
|
|
+ getEzManager().pauseReencryptForTesting();
|
|
|
+ dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
|
|
|
+ waitForQueuedZones(1);
|
|
|
+ getEzManager().resumeReencryptForTesting();
|
|
|
+
|
|
|
+ LOG.info("Waiting for re-encrypt callables to run");
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return callableRunning.get();
|
|
|
+ }
|
|
|
+ }, 100, 10000);
|
|
|
+
|
|
|
+ getEzManager().pauseReencryptUpdaterForTesting();
|
|
|
+ dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.CANCEL);
|
|
|
+
|
|
|
+ // now resume updater and verify status.
|
|
|
+ getEzManager().resumeReencryptUpdaterForTesting();
|
|
|
+ waitForZoneCompletes(zone.toString());
|
|
|
+
|
|
|
+ RemoteIterator<ZoneReencryptionStatus> it =
|
|
|
+ dfsAdmin.listReencryptionStatus();
|
|
|
+ assertTrue(it.hasNext());
|
|
|
+ final ZoneReencryptionStatus zs = it.next();
|
|
|
+ assertEquals(zone.toString(), zs.getZoneName());
|
|
|
+ assertEquals(ZoneReencryptionStatus.State.Completed, zs.getState());
|
|
|
+ assertTrue(zs.isCanceled());
|
|
|
+ assertTrue(zs.getCompletionTime() > 0);
|
|
|
+ assertTrue(zs.getCompletionTime() > zs.getSubmissionTime());
|
|
|
+ assertEquals(0, zs.getFilesReencrypted());
|
|
|
+
|
|
|
+ assertTrue(getUpdater().isRunning());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testReencryptCancelForUpdater() throws Exception {
|
|
|
/* Setup test dir:
|
|
@@ -1822,12 +1902,7 @@ public class TestReencryption {
|
|
|
fsn.getProvider().rollNewVersion(TEST_KEY);
|
|
|
fsn.getProvider().flush();
|
|
|
|
|
|
- final EncryptionZoneManager ezm = getEzManager();
|
|
|
- final ReencryptionHandler handler = (ReencryptionHandler) Whitebox
|
|
|
- .getInternalState(ezm, "reencryptionHandler");
|
|
|
- final ReencryptionUpdater updater = (ReencryptionUpdater) Whitebox
|
|
|
- .getInternalState(handler, "reencryptionUpdater");
|
|
|
- Whitebox.setInternalState(updater, "faultRetryInterval", 50);
|
|
|
+ Whitebox.setInternalState(getUpdater(), "faultRetryInterval", 50);
|
|
|
dfsAdmin.reencryptEncryptionZone(zone, ReencryptAction.START);
|
|
|
waitForReencryptedZones(1);
|
|
|
assertEquals(0, injector.exceptionCount);
|
|
@@ -1844,4 +1919,14 @@ public class TestReencryption {
|
|
|
assertEquals(10, zs.getFilesReencrypted());
|
|
|
assertEquals(0, zs.getNumReencryptionFailures());
|
|
|
}
|
|
|
+
|
|
|
+ private ReencryptionHandler getHandler() {
|
|
|
+ return (ReencryptionHandler) Whitebox
|
|
|
+ .getInternalState(getEzManager(), "reencryptionHandler");
|
|
|
+ }
|
|
|
+
|
|
|
+ private ReencryptionUpdater getUpdater() {
|
|
|
+ return (ReencryptionUpdater) Whitebox
|
|
|
+ .getInternalState(getHandler(), "reencryptionUpdater");
|
|
|
+ }
|
|
|
}
|