瀏覽代碼

HDFS-10609. Uncaught InvalidEncryptionKeyException during pipeline recovery may abort downstream applications. Contributed by Wei-Chiu Chuang.

(cherry picked from commit 3ae652f82110a52bf239f3c1849b48981558eb19)
Wei-Chiu Chuang 8 年之前
父節點
當前提交
bde787db23

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -1772,6 +1772,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  @VisibleForTesting
+  public DataEncryptionKey getEncryptionKey() {
+    return encryptionKey;
+  }
+
   /**
    * Get the checksum of the whole file of a range of the file. Note that the
    * range always starts from the beginning of the file.

+ 110 - 38
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

@@ -124,6 +124,89 @@ import javax.annotation.Nonnull;
 class DataStreamer extends Daemon {
   static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
 
+  private class RefetchEncryptionKeyPolicy {
+    private int fetchEncryptionKeyTimes = 0;
+    private InvalidEncryptionKeyException lastException;
+    private final DatanodeInfo src;
+
+    RefetchEncryptionKeyPolicy(DatanodeInfo src) {
+      this.src = src;
+    }
+    boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
+      if (fetchEncryptionKeyTimes >= 2) {
+        // hit the same exception twice connecting to the node, so
+        // throw the exception and exclude the node.
+        throw lastException;
+      }
+      // Don't exclude this node just yet.
+      // Try again with a new encryption key.
+      LOG.info("Will fetch a new encryption key and retry, "
+          + "encryption key was invalid when connecting to "
+          + this.src + ": ", lastException);
+      // The encryption key used is invalid.
+      dfsClient.clearDataEncryptionKey();
+      return true;
+    }
+
+    /**
+     * Record a connection exception.
+     * @param e
+     * @throws InvalidEncryptionKeyException
+     */
+    void recordFailure(final InvalidEncryptionKeyException e)
+        throws InvalidEncryptionKeyException {
+      fetchEncryptionKeyTimes++;
+      lastException = e;
+    }
+  }
+
+  private class StreamerStreams implements java.io.Closeable {
+    private Socket sock = null;
+    private DataOutputStream out = null;
+    private DataInputStream in = null;
+
+    StreamerStreams(final DatanodeInfo src,
+        final long writeTimeout, final long readTimeout,
+        final Token<BlockTokenIdentifier> blockToken)
+        throws IOException {
+      sock = createSocketForPipeline(src, 2, dfsClient);
+
+      OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+      InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
+      IOStreamPair saslStreams = dfsClient.saslClient
+          .socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src);
+      unbufOut = saslStreams.out;
+      unbufIn = saslStreams.in;
+      out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+          DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
+      in = new DataInputStream(unbufIn);
+    }
+
+    void sendTransferBlock(final DatanodeInfo[] targets,
+        final StorageType[] targetStorageTypes,
+        final Token<BlockTokenIdentifier> blockToken) throws IOException {
+      //send the TRANSFER_BLOCK request
+      new Sender(out)
+          .transferBlock(block, blockToken, dfsClient.clientName, targets,
+              targetStorageTypes);
+      out.flush();
+      //ack
+      BlockOpResponseProto transferResponse = BlockOpResponseProto
+          .parseFrom(PBHelperClient.vintPrefixed(in));
+      if (SUCCESS != transferResponse.getStatus()) {
+        throw new IOException("Failed to add a datanode. Response status: "
+            + transferResponse.getStatus());
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+      IOUtils.closeSocket(sock);
+    }
+  }
+
   /**
    * Create a socket for a write pipeline
    *
@@ -1243,50 +1326,39 @@ class DataStreamer extends Daemon {
         new IOException("Failed to add a node");
   }
 
+  private long computeTransferWriteTimeout() {
+    return dfsClient.getDatanodeWriteTimeout(2);
+  }
+  private long computeTransferReadTimeout() {
+    // transfer timeout multiplier based on the transfer size
+    // One per 200 packets = 12.8MB. Minimum is 2.
+    int multi = 2
+        + (int) (bytesSent / dfsClient.getConf().getWritePacketSize()) / 200;
+    return dfsClient.getDatanodeReadTimeout(multi);
+  }
+
   private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
                         final StorageType[] targetStorageTypes,
                         final Token<BlockTokenIdentifier> blockToken)
       throws IOException {
     //transfer replica to the new datanode
-    Socket sock = null;
-    DataOutputStream out = null;
-    DataInputStream in = null;
-    try {
-      sock = createSocketForPipeline(src, 2, dfsClient);
-      final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-
-      // transfer timeout multiplier based on the transfer size
-      // One per 200 packets = 12.8MB. Minimum is 2.
-      int multi = 2 + (int)(bytesSent /dfsClient.getConf().getWritePacketSize())
-          / 200;
-      final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
-
-      OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
-      InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
-      IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
-          unbufOut, unbufIn, dfsClient, blockToken, src);
-      unbufOut = saslStreams.out;
-      unbufIn = saslStreams.in;
-      out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-          DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
-      in = new DataInputStream(unbufIn);
-
-      //send the TRANSFER_BLOCK request
-      new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
-          targets, targetStorageTypes);
-      out.flush();
-
-      //ack
-      BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
-      if (SUCCESS != response.getStatus()) {
-        throw new IOException("Failed to add a datanode");
+    RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src);
+    do {
+      StreamerStreams streams = null;
+      try {
+        final long writeTimeout = computeTransferWriteTimeout();
+        final long readTimeout = computeTransferReadTimeout();
+
+        streams = new StreamerStreams(src, writeTimeout, readTimeout,
+            blockToken);
+        streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
+        return;
+      } catch (InvalidEncryptionKeyException e) {
+        policy.recordFailure(e);
+      } finally {
+        IOUtils.closeStream(streams);
       }
-    } finally {
-      IOUtils.closeStream(in);
-      IOUtils.closeStream(out);
-      IOUtils.closeSocket(sock);
-    }
+    } while (policy.continueRetryingOrThrow());
   }
 
   /**

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java

@@ -49,7 +49,8 @@ public class BlockPoolTokenSecretManager extends
     map.put(bpid, secretMgr);
   }
 
-  synchronized BlockTokenSecretManager get(String bpid) {
+  @VisibleForTesting
+  public synchronized BlockTokenSecretManager get(String bpid) {
     BlockTokenSecretManager secretMgr = map.get(bpid);
     if (secretMgr == null) {
       throw new IllegalArgumentException("Block pool " + bpid

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java

@@ -436,6 +436,12 @@ public class BlockTokenSecretManager extends
     allKeys.clear();
   }
   
+  @VisibleForTesting
+  public synchronized boolean hasKey(int keyId) {
+    BlockKey key = allKeys.get(keyId);
+    return key != null;
+  }
+
   @VisibleForTesting
   public synchronized int getSerialNoForTesting() {
     return serialNo;

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -2721,6 +2721,11 @@ public class DataNode extends ReconfigurableBase
     return directoryScanner;
   }
 
+  @VisibleForTesting
+  public BlockPoolTokenSecretManager getBlockPoolTokenSecretManager() {
+    return blockPoolTokenSecretManager;
+  }
+
   public static void secureMain(String args[], SecureResources resources) {
     int errorCode = 0;
     try {

+ 308 - 434
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java

@@ -18,24 +18,31 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
@@ -45,9 +52,14 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -59,6 +71,9 @@ public class TestEncryptedTransfer {
     LogManager.getLogger(SaslDataTransferServer.class).setLevel(Level.DEBUG);
     LogManager.getLogger(DataTransferSaslUtil.class).setLevel(Level.DEBUG);
   }
+
+  @Rule
+  public Timeout timeout = new Timeout(300000);
   
   @Parameters
   public static Collection<Object[]> data() {
@@ -72,8 +87,12 @@ public class TestEncryptedTransfer {
   
   private static final String PLAIN_TEXT = "this is very secret plain text";
   private static final Path TEST_PATH = new Path("/non-encrypted-file");
+
+  private MiniDFSCluster cluster = null;
+  private Configuration conf = null;
+  private FileSystem fs = null;
   
-  private void setEncryptionConfigKeys(Configuration conf) {
+  private void setEncryptionConfigKeys() {
     conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     if (resolverClazz != null){
@@ -96,505 +115,360 @@ public class TestEncryptedTransfer {
     this.resolverClazz = resolverClazz;
   }
 
-  @Test
-  public void testEncryptedRead() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (fs != null) {
       fs.close();
+    }
+    if (cluster != null) {
       cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
-      
-      fs.close();
-      
-      if (resolverClazz == null) {
-        // Test client and server negotiate cipher option
-        GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
-            "Server using cipher suite");
-        // Check the IOStreamPair
-        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
-            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
   }
-  
-  @Test
-  public void testEncryptedReadWithRC4() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      // It'll use 3DES by default, but we set it to rc4 here.
-      conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4");
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
 
-      fs.close();
+  private FileChecksum writeUnencryptedAndThenRestartEncryptedCluster()
+      throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).build();
 
-      if (resolverClazz == null) {
-        // Test client and server negotiate cipher option
-        GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
-            "Server using cipher suite");
-        // Check the IOStreamPair
-        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
-            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-  
-  @Test
-  public void testEncryptedReadWithAES() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
-          "AES/CTR/NoPadding");
-      cluster = new MiniDFSCluster.Builder(conf).build();
+    fs = getFileSystem(conf);
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+    fs.close();
+    cluster.shutdown();
 
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
+    setEncryptionConfigKeys();
 
-      setEncryptionConfigKeys(conf);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .manageDataDfsDirs(false)
+        .manageNameDfsDirs(false)
+        .format(false)
+        .startupOption(StartupOption.REGULAR)
+        .build();
 
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
+    fs = getFileSystem(conf);
+    return checksum;
+  }
 
-      fs = getFileSystem(conf);
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
+  private void testEncryptedRead(String algorithm, String cipherSuite,
+      boolean matchLog, boolean readAfterRestart)
+      throws IOException {
+    // set encryption algorithm and cipher suites, but don't enable transfer
+    // encryption yet.
+    conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, algorithm);
+    conf.set(HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
+        cipherSuite);
 
-      fs.close();
+    FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
 
-      if (resolverClazz == null) {
+    LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(SaslDataTransferServer.class));
+    LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(DataTransferSaslUtil.class));
+    try {
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+    } finally {
+      logs.stopCapturing();
+      logs1.stopCapturing();
+    }
+
+    if (resolverClazz == null) {
+      if (matchLog) {
         // Test client and server negotiate cipher option
-        GenericTestUtils.assertMatches(logs.getOutput(),
-            "Server using cipher suite");
+        GenericTestUtils
+            .assertMatches(logs.getOutput(), "Server using cipher suite");
         // Check the IOStreamPair
         GenericTestUtils.assertMatches(logs1.getOutput(),
             "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
+      } else {
+        // Test client and server negotiate cipher option
+        GenericTestUtils
+            .assertDoesNotMatch(logs.getOutput(), "Server using cipher suite");
+        // Check the IOStreamPair
+        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
+            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
       }
     }
-  }
 
-  @Test
-  public void testEncryptedReadAfterNameNodeRestart() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      fs.close();
-      
+    if (readAfterRestart) {
       cluster.restartNameNode();
       fs = getFileSystem(conf);
       assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
       assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
   }
-  
+
+  @Test
+  public void testEncryptedReadDefaultAlgorithmCipherSuite()
+      throws IOException {
+    testEncryptedRead("", "", false, false);
+  }
+
+  @Test
+  public void testEncryptedReadWithRC4() throws IOException {
+    testEncryptedRead("rc4", "", false, false);
+  }
+
+  @Test
+  public void testEncryptedReadWithAES() throws IOException {
+    testEncryptedRead("", "AES/CTR/NoPadding", true, false);
+  }
+
+  @Test
+  public void testEncryptedReadAfterNameNodeRestart() throws IOException {
+    testEncryptedRead("", "", false, true);
+  }
+
   @Test
   public void testClientThatDoesNotSupportEncryption() throws IOException {
-    MiniDFSCluster cluster = null;
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+
+    writeUnencryptedAndThenRestartEncryptedCluster();
+
+    DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+    DFSClient spyClient = Mockito.spy(client);
+    Mockito.doReturn(false).when(spyClient).shouldEncryptData();
+    DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+
+    LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(DataNode.class));
     try {
-      Configuration conf = new Configuration();
-      // Set short retry timeouts so this test runs faster
-      conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
       assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      
-      fs = getFileSystem(conf);
-      DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
-      DFSClient spyClient = Mockito.spy(client);
-      Mockito.doReturn(false).when(spyClient).shouldEncryptData();
-      DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
-      
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataNode.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
-          fail("Should not have been able to read without encryption enabled.");
-        }
-      } catch (IOException ioe) {
-        GenericTestUtils.assertExceptionContains("Could not obtain block:",
-            ioe);
-      } finally {
-        logs.stopCapturing();
-      }
-      fs.close();
-      
-      if (resolverClazz == null) {
-        GenericTestUtils.assertMatches(logs.getOutput(),
-        "Failed to read expected encryption handshake from client at");
+      if (resolverClazz != null &&
+          !resolverClazz.endsWith("TestTrustedChannelResolver")){
+        fail("Should not have been able to read without encryption enabled.");
       }
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Could not obtain block:",
+          ioe);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      logs.stopCapturing();
+    }
+
+    if (resolverClazz == null) {
+      GenericTestUtils.assertMatches(logs.getOutput(),
+          "Failed to read expected encryption handshake from client at");
     }
   }
-  
+
   @Test
   public void testLongLivedReadClientAfterRestart() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      // Restart the NN and DN, after which the client's encryption key will no
-      // longer be valid.
-      cluster.restartNameNode();
-      assertTrue(cluster.restartDataNode(0));
-      
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+
+    // Restart the NN and DN, after which the client's encryption key will no
+    // longer be valid.
+    cluster.restartNameNode();
+    assertTrue(cluster.restartDataNode(0));
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
   }
-  
+
   @Test
   public void testLongLivedWriteClientAfterRestart() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      // Restart the NN and DN, after which the client's encryption key will no
-      // longer be valid.
-      cluster.restartNameNode();
-      assertTrue(cluster.restartDataNodes());
-      cluster.waitActive();
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    setEncryptionConfigKeys();
+    cluster = new MiniDFSCluster.Builder(conf).build();
+
+    fs = getFileSystem(conf);
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    // Restart the NN and DN, after which the client's encryption key will no
+    // longer be valid.
+    cluster.restartNameNode();
+    assertTrue(cluster.restartDataNodes());
+    cluster.waitActive();
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
   }
   
   @Test
   public void testLongLivedClient() throws IOException, InterruptedException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
-          .getBlockTokenSecretManager();
-      btsm.setKeyUpdateIntervalForTesting(2 * 1000);
-      btsm.setTokenLifetime(2 * 1000);
-      btsm.clearAllKeysForTesting();
-      
-      fs = getFileSystem(conf);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      // Sleep for 15 seconds, after which the encryption key will no longer be
-      // valid. It needs to be a few multiples of the block token lifetime,
-      // since several block tokens are valid at any given time (the current
-      // and the last two, by default.)
-      LOG.info("Sleeping so that encryption keys expire...");
-      Thread.sleep(15 * 1000);
-      LOG.info("Done sleeping.");
-      
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+    FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+    BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager();
+    btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+    btsm.setTokenLifetime(2 * 1000);
+    btsm.clearAllKeysForTesting();
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+
+    // Sleep for 15 seconds, after which the encryption key will no longer be
+    // valid. It needs to be a few multiples of the block token lifetime,
+    // since several block tokens are valid at any given time (the current
+    // and the last two, by default.)
+    LOG.info("Sleeping so that encryption keys expire...");
+    Thread.sleep(15 * 1000);
+    LOG.info("Done sleeping.");
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+  }
+
+  @Test
+  public void testLongLivedClientPipelineRecovery()
+      throws IOException, InterruptedException, TimeoutException {
+    if (resolverClazz != null) {
+      // TestTrustedChannelResolver does not use encryption keys.
+      return;
+    }
+    // use 4 datanodes to make sure that after 1 data node is stopped,
+    // client only retries establishing pipeline with the 4th node.
+    int numDataNodes = 4;
+    // do not consider load factor when selecting a data node
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
+    setEncryptionConfigKeys();
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDataNodes)
+        .build();
+
+    fs = getFileSystem(conf);
+    DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+    DFSClient spyClient = Mockito.spy(client);
+    DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+    writeTestDataToFile(fs);
+
+    BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager();
+    // Reduce key update interval and token life for testing.
+    btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+    btsm.setTokenLifetime(2 * 1000);
+    btsm.clearAllKeysForTesting();
+
+    // Wait until the encryption key becomes invalid.
+    LOG.info("Wait until encryption keys become invalid...");
+
+    final DataEncryptionKey encryptionKey = spyClient.getEncryptionKey();
+    List<DataNode> dataNodes = cluster.getDataNodes();
+    for (final DataNode dn: dataNodes) {
+      GenericTestUtils.waitFor(
+          new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+              return !dn.getBlockPoolTokenSecretManager().
+                  get(encryptionKey.blockPoolId)
+                  .hasKey(encryptionKey.keyId);
+            }
+          }, 100, 30*1000
+      );
     }
+    LOG.info("The encryption key is invalid on all nodes now.");
+    try(FSDataOutputStream out = fs.append(TEST_PATH)) {
+      DFSOutputStream dfstream = (DFSOutputStream) out.getWrappedStream();
+      // shut down the first datanode in the pipeline.
+      DatanodeInfo[] targets = dfstream.getPipeline();
+      cluster.stopDataNode(targets[0].getXferAddr());
+      // write data to induce pipeline recovery
+      out.write(PLAIN_TEXT.getBytes());
+      out.hflush();
+      assertFalse("The first datanode in the pipeline was not replaced.",
+          Arrays.asList(dfstream.getPipeline()).contains(targets[0]));
+    }
+    // verify that InvalidEncryptionKeyException is handled properly
+    Mockito.verify(spyClient, times(1)).clearDataEncryptionKey();
   }
-  
+
   @Test
   public void testEncryptedWriteWithOneDn() throws IOException {
     testEncryptedWrite(1);
   }
-  
+
   @Test
   public void testEncryptedWriteWithTwoDns() throws IOException {
     testEncryptedWrite(2);
   }
-  
+
   @Test
   public void testEncryptedWriteWithMultipleDns() throws IOException {
     testEncryptedWrite(10);
   }
-  
+
   private void testEncryptedWrite(int numDns) throws IOException {
-    MiniDFSCluster cluster = null;
+    setEncryptionConfigKeys();
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
+
+    fs = getFileSystem(conf);
+
+    LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(SaslDataTransferServer.class));
+    LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(DataTransferSaslUtil.class));
     try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        writeTestDataToFile(fs);
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      fs.close();
-      
-      if (resolverClazz == null) {
-        // Test client and server negotiate cipher option
-        GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
-            "Server using cipher suite");
-        // Check the IOStreamPair
-        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
-            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
+      writeTestDataToFile(fs);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      logs.stopCapturing();
+      logs1.stopCapturing();
+    }
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    if (resolverClazz == null) {
+      // Test client and server negotiate cipher option
+      GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
+          "Server using cipher suite");
+      // Check the IOStreamPair
+      GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
+          "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
     }
   }
-  
+
   @Test
   public void testEncryptedAppend() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    setEncryptionConfigKeys();
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+
+    fs = getFileSystem(conf);
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
   }
-  
+
   @Test
   public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      
-      // start up 4 DNs
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      // Create a file with replication 3, so its block is on 3 / 4 DNs.
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      // Shut down one of the DNs holding a block replica.
-      FSDataInputStream in = fs.open(TEST_PATH);
-      List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
-      in.close();
-      assertEquals(1, locatedBlocks.size());
-      assertEquals(3, locatedBlocks.get(0).getLocations().length);
-      DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
-      dn.shutdown();
-      
-      // Reopen the file for append, which will need to add another DN to the
-      // pipeline and in doing so trigger a block transfer.
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    setEncryptionConfigKeys();
+
+    // start up 4 DNs
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+
+    fs = getFileSystem(conf);
+
+    // Create a file with replication 3, so its block is on 3 / 4 DNs.
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    // Shut down one of the DNs holding a block replica.
+    FSDataInputStream in = fs.open(TEST_PATH);
+    List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
+    in.close();
+    assertEquals(1, locatedBlocks.size());
+    assertEquals(3, locatedBlocks.get(0).getLocations().length);
+    DataNode dn = cluster.getDataNode(
+        locatedBlocks.get(0).getLocations()[0].getIpcPort());
+    dn.shutdown();
+
+    // Reopen the file for append, which will need to add another DN to the
+    // pipeline and in doing so trigger a block transfer.
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
   }
   
   private static void writeTestDataToFile(FileSystem fs) throws IOException {