Pārlūkot izejas kodu

HDDS-959. KeyOutputStream should handle retry failures. Contributed by Lokesh Jain.

Mukul Kumar Singh 6 gadi atpakaļ
vecāks
revīzija
4ac0404fe0

+ 10 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java

@@ -129,11 +129,20 @@ public class XceiverClientManager implements Closeable {
    * Releases a XceiverClientSpi after use.
    *
    * @param client client to release
+   * @param invalidateClient if true, invalidates the client in cache
    */
-  public void releaseClient(XceiverClientSpi client) {
+  public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
     Preconditions.checkNotNull(client);
     synchronized (clientCache) {
       client.decrementReference();
+      if (invalidateClient) {
+        Pipeline pipeline = client.getPipeline();
+        String key = pipeline.getId().getId().toString() + pipeline.getType();
+        XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
+        if (cachedClient == client) {
+          clientCache.invalidate(key);
+        }
+      }
     }
   }
 

+ 5 - 5
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java

@@ -100,7 +100,7 @@ public class ContainerOperationClient implements ScmClient {
       return containerWithPipeline;
     } finally {
       if (client != null) {
-        xceiverClientManager.releaseClient(client);
+        xceiverClientManager.releaseClient(client, false);
       }
     }
   }
@@ -191,7 +191,7 @@ public class ContainerOperationClient implements ScmClient {
       return containerWithPipeline;
     } finally {
       if (client != null) {
-        xceiverClientManager.releaseClient(client);
+        xceiverClientManager.releaseClient(client, false);
       }
     }
   }
@@ -269,7 +269,7 @@ public class ContainerOperationClient implements ScmClient {
       }
     } finally {
       if (client != null) {
-        xceiverClientManager.releaseClient(client);
+        xceiverClientManager.releaseClient(client, false);
       }
     }
   }
@@ -318,7 +318,7 @@ public class ContainerOperationClient implements ScmClient {
       return response.getContainerData();
     } finally {
       if (client != null) {
-        xceiverClientManager.releaseClient(client);
+        xceiverClientManager.releaseClient(client, false);
       }
     }
   }
@@ -410,7 +410,7 @@ public class ContainerOperationClient implements ScmClient {
           ObjectStageChangeRequestProto.Stage.complete);
     } finally {
       if (client != null) {
-        xceiverClientManager.releaseClient(client);
+        xceiverClientManager.releaseClient(client, false);
       }
     }
   }

+ 1 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java

@@ -141,7 +141,7 @@ public class BlockInputStream extends InputStream implements Seekable {
   @Override
   public synchronized void close() {
     if (xceiverClientManager != null && xceiverClient != null) {
-      xceiverClientManager.releaseClient(xceiverClient);
+      xceiverClientManager.releaseClient(xceiverClient, false);
       xceiverClientManager = null;
       xceiverClient = null;
     }

+ 9 - 8
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
@@ -113,7 +114,7 @@ public class BlockOutputStream extends OutputStream {
    * @param blockID              block ID
    * @param key                  chunk key
    * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient        client to perform container calls
+   * @param pipeline             pipeline where block will be written
    * @param traceID              container protocol call args
    * @param chunkSize            chunk size
    * @param bufferList           list of byte buffers
@@ -124,10 +125,10 @@ public class BlockOutputStream extends OutputStream {
    */
   @SuppressWarnings("parameternumber")
   public BlockOutputStream(BlockID blockID, String key,
-      XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
+      XceiverClientManager xceiverClientManager, Pipeline pipeline,
       String traceID, int chunkSize, long streamBufferFlushSize,
-      long streamBufferMaxSize, long watchTimeout,
-      List<ByteBuffer> bufferList, Checksum checksum) {
+      long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
+      Checksum checksum) throws IOException {
     this.blockID = blockID;
     this.key = key;
     this.traceID = traceID;
@@ -138,7 +139,7 @@ public class BlockOutputStream extends OutputStream {
         BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
             .addMetadata(keyValue);
     this.xceiverClientManager = xceiverClientManager;
-    this.xceiverClient = xceiverClient;
+    this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
     this.streamId = UUID.randomUUID().toString();
     this.chunkIndex = 0;
     this.streamBufferFlushSize = streamBufferFlushSize;
@@ -500,7 +501,7 @@ public class BlockOutputStream extends OutputStream {
           throw new IOException(
               "Unexpected Storage Container Exception: " + e.toString(), e);
         } finally {
-          cleanup();
+          cleanup(false);
         }
       }
       // clear the currentBuffer
@@ -541,9 +542,9 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
-  public void cleanup() {
+  public void cleanup(boolean invalidateClient) {
     if (xceiverClientManager != null) {
-      xceiverClientManager.releaseClient(xceiverClient);
+      xceiverClientManager.releaseClient(xceiverClient, invalidateClient);
     }
     xceiverClientManager = null;
     xceiverClient = null;

+ 1 - 1
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java

@@ -311,7 +311,7 @@ public class KeyInputStream extends InputStream implements Seekable {
             omKeyLocationInfo.getLength());
       } finally {
         if (!success) {
-          xceiverClientManager.releaseClient(xceiverClient);
+          xceiverClientManager.releaseClient(xceiverClient, false);
         }
       }
     }

+ 47 - 62
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.*;
@@ -31,11 +32,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,19 +108,6 @@ public class KeyOutputStream extends OutputStream {
     this.checksum = new Checksum();
   }
 
-  /**
-   * For testing purpose only. Not building output stream from blocks, but
-   * taking from externally.
-   *
-   * @param outputStream
-   * @param length
-   */
-  @VisibleForTesting
-  public void addStream(OutputStream outputStream, long length) {
-    streamEntries.add(
-        new BlockOutputStreamEntry(outputStream, length, checksum));
-  }
-
   @VisibleForTesting
   public List<BlockOutputStreamEntry> getStreamEntries() {
     return streamEntries;
@@ -213,12 +201,11 @@ public class KeyOutputStream extends OutputStream {
       throws IOException {
     ContainerWithPipeline containerWithPipeline = scmClient
         .getContainerWithPipeline(subKeyInfo.getContainerID());
-    XceiverClientSpi xceiverClient =
-        xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
     streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(),
-        keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
-        chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
-        streamBufferMaxSize, watchTimeout, bufferList, checksum));
+        keyArgs.getKeyName(), xceiverClientManager,
+        containerWithPipeline.getPipeline(), requestID, chunkSize,
+        subKeyInfo.getLength(), streamBufferFlushSize, streamBufferMaxSize,
+        watchTimeout, bufferList, checksum));
   }
 
 
@@ -297,12 +284,14 @@ public class KeyOutputStream extends OutputStream {
           current.write(b, off, writeLen);
         }
       } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
+        boolean retryFailure = checkForRetryFailure(ioe);
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)
+            || retryFailure) {
           // for the current iteration, totalDataWritten - currentPos gives the
           // amount of data already written to the buffer
           writeLen = (int) (current.getWrittenDataLength() - currentPos);
           LOG.debug("writeLen {}, total len {}", writeLen, len);
-          handleException(current, currentStreamIndex);
+          handleException(current, currentStreamIndex, retryFailure);
         } else {
           throw ioe;
         }
@@ -362,17 +351,19 @@ public class KeyOutputStream extends OutputStream {
    *
    * @param streamEntry StreamEntry
    * @param streamIndex Index of the entry
+   * @param retryFailure if true the xceiverClient needs to be invalidated in
+   *                     the client cache.
    * @throws IOException Throws IOException if Write fails
    */
   private void handleException(BlockOutputStreamEntry streamEntry,
-      int streamIndex) throws IOException {
+      int streamIndex, boolean retryFailure) throws IOException {
     long totalSuccessfulFlushedData =
         streamEntry.getTotalSuccessfulFlushedData();
     //set the correct length for the current stream
     streamEntry.currentPosition = totalSuccessfulFlushedData;
     long bufferedDataLen = computeBufferData();
     // just clean up the current stream.
-    streamEntry.cleanup();
+    streamEntry.cleanup(retryFailure);
     if (bufferedDataLen > 0) {
       // If the data is still cached in the underlying stream, we need to
       // allocate new block and write this data in the datanode.
@@ -390,7 +381,7 @@ public class KeyOutputStream extends OutputStream {
 
   private boolean checkIfContainerIsClosed(IOException ioe) {
     if (ioe.getCause() != null) {
-      return checkIfContainerNotOpenOrRaftRetryFailureException(ioe) || Optional
+      return checkForException(ioe, ContainerNotOpenException.class) || Optional
           .of(ioe.getCause())
           .filter(e -> e instanceof StorageContainerException)
           .map(e -> (StorageContainerException) e)
@@ -400,13 +391,23 @@ public class KeyOutputStream extends OutputStream {
     return false;
   }
 
-  private boolean checkIfContainerNotOpenOrRaftRetryFailureException(
-      IOException ioe) {
+  /**
+   * Checks if the provided exception signifies retry failure in ratis client.
+   * In case of retry failure, ratis client throws RaftRetryFailureException
+   * and all succeeding operations are failed with AlreadyClosedException.
+   */
+  private boolean checkForRetryFailure(IOException ioe) {
+    return checkForException(ioe, RaftRetryFailureException.class,
+        AlreadyClosedException.class);
+  }
+
+  private boolean checkForException(IOException ioe, Class... classes) {
     Throwable t = ioe.getCause();
     while (t != null) {
-      if (t instanceof ContainerNotOpenException
-          || t instanceof RaftRetryFailureException) {
-        return true;
+      for (Class cls : classes) {
+        if (cls.isInstance(t)) {
+          return true;
+        }
       }
       t = t.getCause();
     }
@@ -469,11 +470,13 @@ public class KeyOutputStream extends OutputStream {
           entry.flush();
         }
       } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
+        boolean retryFailure = checkForRetryFailure(ioe);
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)
+            || retryFailure) {
           // This call will allocate a new streamEntry and write the Data.
           // Close needs to be retried on the newly allocated streamEntry as
           // as well.
-          handleException(entry, streamIndex);
+          handleException(entry, streamIndex, retryFailure);
           handleFlushOrClose(close);
         } else {
           throw ioe;
@@ -643,7 +646,7 @@ public class KeyOutputStream extends OutputStream {
     private BlockID blockID;
     private final String key;
     private final XceiverClientManager xceiverClientManager;
-    private final XceiverClientSpi xceiverClient;
+    private final Pipeline pipeline;
     private final Checksum checksum;
     private final String requestId;
     private final int chunkSize;
@@ -660,14 +663,14 @@ public class KeyOutputStream extends OutputStream {
     @SuppressWarnings("parameternumber")
     BlockOutputStreamEntry(BlockID blockID, String key,
         XceiverClientManager xceiverClientManager,
-        XceiverClientSpi xceiverClient, String requestId, int chunkSize,
+        Pipeline pipeline, String requestId, int chunkSize,
         long length, long streamBufferFlushSize, long streamBufferMaxSize,
         long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum) {
       this.outputStream = null;
       this.blockID = blockID;
       this.key = key;
       this.xceiverClientManager = xceiverClientManager;
-      this.xceiverClient = xceiverClient;
+      this.pipeline = pipeline;
       this.requestId = requestId;
       this.chunkSize = chunkSize;
 
@@ -680,30 +683,6 @@ public class KeyOutputStream extends OutputStream {
       this.bufferList = bufferList;
     }
 
-    /**
-     * For testing purpose, taking a some random created stream instance.
-     * @param  outputStream a existing writable output stream
-     * @param  length the length of data to write to the stream
-     */
-    BlockOutputStreamEntry(OutputStream outputStream, long length,
-        Checksum checksum) {
-      this.outputStream = outputStream;
-      this.blockID = null;
-      this.key = null;
-      this.xceiverClientManager = null;
-      this.xceiverClient = null;
-      this.requestId = null;
-      this.chunkSize = -1;
-
-      this.length = length;
-      this.currentPosition = 0;
-      streamBufferFlushSize = 0;
-      streamBufferMaxSize = 0;
-      bufferList = null;
-      watchTimeout = 0;
-      this.checksum = checksum;
-    }
-
     long getLength() {
       return length;
     }
@@ -712,11 +691,17 @@ public class KeyOutputStream extends OutputStream {
       return length - currentPosition;
     }
 
-    private void checkStream() {
+    /**
+     * BlockOutputStream is initialized in this function. This makes sure that
+     * xceiverClient initialization is not done during preallocation and only
+     * done when data is written.
+     * @throws IOException if xceiverClient initialization fails
+     */
+    private void checkStream() throws IOException {
       if (this.outputStream == null) {
         this.outputStream =
             new BlockOutputStream(blockID, key, xceiverClientManager,
-                xceiverClient, requestId, chunkSize, streamBufferFlushSize,
+                pipeline, requestId, chunkSize, streamBufferFlushSize,
                 streamBufferMaxSize, watchTimeout, bufferList, checksum);
       }
     }
@@ -781,11 +766,11 @@ public class KeyOutputStream extends OutputStream {
       throw new IOException("Invalid Output Stream for Key: " + key);
     }
 
-    void cleanup() {
+    void cleanup(boolean invalidateClient) throws IOException {
       checkStream();
       if (this.outputStream instanceof BlockOutputStream) {
         BlockOutputStream out = (BlockOutputStream) this.outputStream;
-        out.cleanup();
+        out.cleanup(invalidateClient);
       }
     }
 

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java

@@ -116,6 +116,6 @@ public class TestContainerStateMachineIdempotency {
     } catch (IOException ioe) {
       Assert.fail("Container operation failed" + ioe);
     }
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 }

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -698,7 +698,7 @@ public abstract class TestOzoneRpcClientAbstract {
       Assert.assertTrue(
           e.getMessage().contains("on the pipeline " + pipeline.getId()));
     }
-    manager.releaseClient(clientSpi);
+    manager.releaseClient(clientSpi, false);
   }
 
   private void readKey(OzoneBucket bucket, String keyName, String data)

+ 4 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java

@@ -98,7 +98,7 @@ public class TestContainerSmallFile {
         ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
     String readData = response.getData().getData().toStringUtf8();
     Assert.assertEquals("data123", readData);
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 
   @Test
@@ -121,7 +121,7 @@ public class TestContainerSmallFile {
     // Try to read a Key Container Name
     ContainerProtos.GetSmallFileResponseProto response =
         ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 
   @Test
@@ -149,7 +149,7 @@ public class TestContainerSmallFile {
         ContainerProtocolCalls.readSmallFile(client,
             ContainerTestHelper.getTestBlockID(
                 nonExistContainerID), traceID);
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 
   @Test
@@ -202,7 +202,7 @@ public class TestContainerSmallFile {
         ContainerProtocolCalls.readSmallFile(client, blockID1, traceID);
     String readData = response.getData().getData().toStringUtf8();
     Assert.assertEquals("data123", readData);
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 }
 

+ 3 - 3
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java

@@ -114,7 +114,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
     Assert.assertTrue(
         BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
     Assert.assertTrue(response.getBlockLength() == data.length);
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 
   @Test
@@ -139,7 +139,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
     } catch (StorageContainerException sce) {
       Assert.assertTrue(sce.getMessage().contains("Unable to find the block"));
     }
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 
   @Test
@@ -180,6 +180,6 @@ public class TestGetCommittedBlockLengthAndPutKey {
     // This will also ensure that closing the container committed the block
     // on the Datanodes.
     Assert.assertEquals(responseBlockID, blockID);
-    xceiverClientManager.releaseClient(client);
+    xceiverClientManager.releaseClient(client, false);
   }
 }

+ 45 - 7
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java

@@ -96,9 +96,9 @@ public class TestXceiverClientManager {
     Assert.assertEquals(2, client3.getRefcount());
     Assert.assertEquals(2, client1.getRefcount());
     Assert.assertEquals(client1, client3);
-    clientManager.releaseClient(client1);
-    clientManager.releaseClient(client2);
-    clientManager.releaseClient(client3);
+    clientManager.releaseClient(client1, false);
+    clientManager.releaseClient(client2, false);
+    clientManager.releaseClient(client3, false);
   }
 
   @Test
@@ -140,7 +140,7 @@ public class TestXceiverClientManager {
 
     // After releasing the client, this connection should be closed
     // and any container operations should fail
-    clientManager.releaseClient(client1);
+    clientManager.releaseClient(client1, false);
 
     String expectedMessage = "This channel is not connected.";
     try {
@@ -152,7 +152,7 @@ public class TestXceiverClientManager {
       Assert.assertEquals(e.getClass(), IOException.class);
       Assert.assertTrue(e.getMessage().contains(expectedMessage));
     }
-    clientManager.releaseClient(client2);
+    clientManager.releaseClient(client2, false);
   }
 
   @Test
@@ -171,7 +171,7 @@ public class TestXceiverClientManager {
         .acquireClient(container1.getPipeline());
     Assert.assertEquals(1, client1.getRefcount());
 
-    clientManager.releaseClient(client1);
+    clientManager.releaseClient(client1, false);
     Assert.assertEquals(0, client1.getRefcount());
 
     ContainerWithPipeline container2 = storageContainerLocationClient
@@ -200,6 +200,44 @@ public class TestXceiverClientManager {
       Assert.assertEquals(e.getClass(), IOException.class);
       Assert.assertTrue(e.getMessage().contains(expectedMessage));
     }
-    clientManager.releaseClient(client2);
+    clientManager.releaseClient(client2, false);
+  }
+
+  @Test
+  public void testFreeByRetryFailure() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    Cache<String, XceiverClientSpi> cache =
+        clientManager.getClientCache();
+
+    // client is added in cache
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(clientManager.getType(), clientManager.getFactor(),
+            containerOwner);
+    XceiverClientSpi client1 =
+        clientManager.acquireClient(container1.getPipeline());
+    clientManager.acquireClient(container1.getPipeline());
+    Assert.assertEquals(2, client1.getRefcount());
+
+    // client should be invalidated in the cache
+    clientManager.releaseClient(client1, true);
+    Assert.assertEquals(1, client1.getRefcount());
+    Assert.assertNull(cache.getIfPresent(
+        container1.getContainerInfo().getPipelineID().getId().toString()
+            + container1.getContainerInfo().getReplicationType()));
+
+    // new client should be added in cache
+    XceiverClientSpi client2 =
+        clientManager.acquireClient(container1.getPipeline());
+    Assert.assertNotEquals(client1, client2);
+    Assert.assertEquals(1, client2.getRefcount());
+
+    // on releasing the old client the entry in cache should not be invalidated
+    clientManager.releaseClient(client1, true);
+    Assert.assertEquals(0, client1.getRefcount());
+    Assert.assertNotNull(cache.getIfPresent(
+        container1.getContainerInfo().getPipelineID().getId().toString()
+            + container1.getContainerInfo().getReplicationType()));
   }
 }