Selaa lähdekoodia

HDFS-195. Handle expired tokens when write pipeline is restablished.
(Kan Zhang via rangadi)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@787764 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 16 vuotta sitten
vanhempi
commit
4b9e2f0836

+ 3 - 0
CHANGES.txt

@@ -25,3 +25,6 @@ Trunk (unreleased changes)
 
     HADOOP-6096. Fix Eclipse project and classpath files following project
     split. (tomwhite)
+
+    HDFS-195. Handle expired tokens when write pipeline is restablished.
+    (Kan Zhang via rangadi)

+ 17 - 9
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -2610,12 +2610,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
           // If the block recovery generated a new generation stamp, use that
           // from now on.  Also, setup new pipeline
-          //
-          if (newBlock != null) {
-            block = newBlock.getBlock();
-            accessToken = newBlock.getAccessToken();
-            nodes = newBlock.getLocations();
-          }
+          // newBlock should never be null and it should contain a newly 
+          // generated access token.
+          block = newBlock.getBlock();
+          accessToken = newBlock.getAccessToken();
+          nodes = newBlock.getLocations();
 
           this.hasError = false;
           lastException = null;
@@ -2687,6 +2686,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       //
       private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
           boolean recoveryFlag) {
+        short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
         String firstBadLink = "";
         if (LOG.isDebugEnabled()) {
           for (int i = 0; i < nodes.length; i++) {
@@ -2725,10 +2725,17 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           out.flush();
 
           // receive ack for connect
+          pipelineStatus = blockReplyStream.readShort();
           firstBadLink = Text.readString(blockReplyStream);
-          if (firstBadLink.length() != 0) {
-            throw new IOException("Bad connect ack with firstBadLink "
-                + firstBadLink);
+          if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+            if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+              throw new InvalidAccessTokenException(
+                  "Got access token error for connect ack with firstBadLink as "
+                      + firstBadLink);
+            } else {
+              throw new IOException("Bad connect ack with firstBadLink as "
+                  + firstBadLink);
+            }
           }
 
           blockStream = out;
@@ -2799,6 +2806,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       void initAppend(LocatedBlock lastBlock, FileStatus stat,
           int bytesPerChecksum) throws IOException {
         block = lastBlock.getBlock();
+        accessToken = lastBlock.getAccessToken();
         long usedInLastBlock = stat.getLen() % blockSize;
         int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 

+ 5 - 5
src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -29,17 +29,17 @@ public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 3: add keepLength parameter.
+   * 4: never return null and always return a newly generated access token
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
 
   /** Start generation-stamp recovery for specified block
    * @param block the specified block
    * @param keepLength keep the block length
    * @param targets the list of possible locations of specified block
-   * @return the new blockid if recovery successful and the generation stamp
-   * got updated as part of the recovery, else returns null if the block id
-   * not have any data and the block was deleted.
+   * @return either a new generation stamp, or the original generation stamp. 
+   * Regardless of whether a new generation stamp is returned, a newly 
+   * generated access token is returned as part of the return value.
    * @throws IOException
    */
   LocatedBlock recoverBlock(Block block, boolean keepLength,

+ 5 - 4
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -35,11 +35,12 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 15:
-   *    Added a new status OP_STATUS_ERROR_ACCESS_TOKEN
-   *    Access token is now required on all DN operations
+   * Version 16:
+   *    Datanode now needs to send back a status code together 
+   *    with firstBadLink during pipeline setup for dfs write
+   *    (only for DFSClients, not for other datanodes).
    */
-  public static final int DATA_TRANSFER_VERSION = 15;
+  public static final int DATA_TRANSFER_VERSION = 16;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;

+ 19 - 5
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1545,8 +1545,9 @@ public class DataNode extends Configured
 
   /** Recover a block */
   private LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeID[] datanodeids, boolean closeFile) throws IOException {
+      DatanodeInfo[] targets, boolean closeFile) throws IOException {
 
+    DatanodeID[] datanodeids = (DatanodeID[])targets;
     // If the block is already being recovered, then skip recovering it.
     // This can happen if the namenode and client start recovering the same
     // file at the same time.
@@ -1600,7 +1601,7 @@ public class DataNode extends Configured
       if (!keepLength) {
         block.setNumBytes(minlength);
       }
-      return syncBlock(block, syncList, closeFile);
+      return syncBlock(block, syncList, targets, closeFile);
     } finally {
       synchronized (ongoingRecovery) {
         ongoingRecovery.remove(block);
@@ -1610,7 +1611,7 @@ public class DataNode extends Configured
 
   /** Block synchronization */
   private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
-      boolean closeFile) throws IOException {
+      DatanodeInfo[] targets, boolean closeFile) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
           + "), syncList=" + syncList + ", closeFile=" + closeFile);
@@ -1621,7 +1622,13 @@ public class DataNode extends Configured
     if (syncList.isEmpty()) {
       namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
           DatanodeID.EMPTY_ARRAY);
-      return null;
+      //always return a new access token even if everything else stays the same
+      LocatedBlock b = new LocatedBlock(block, targets);
+      if (isAccessTokenEnabled) {
+        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      }
+      return b;
     }
 
     List<DatanodeID> successList = new ArrayList<DatanodeID>();
@@ -1649,7 +1656,14 @@ public class DataNode extends Configured
       for (int i = 0; i < nlist.length; i++) {
         info[i] = new DatanodeInfo(nlist[i]);
       }
-      return new LocatedBlock(newblock, info); // success
+      LocatedBlock b = new LocatedBlock(newblock, info); // success
+      // should have used client ID to generate access token, but since 
+      // owner ID is not checked, we simply pass null for now.
+      if (isAccessTokenEnabled) {
+        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+      }
+      return b;
     }
 
     //failed

+ 7 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -238,6 +238,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
             .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
       try {
         if (client.length() != 0) {
+          replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
           Text.writeString(replyOut, datanode.dnRegistration.getName());
           replyOut.flush();
         }
@@ -254,6 +255,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
+    short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
     try {
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(block, in, 
@@ -294,8 +296,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
 
           // read connect ack (only for clients, not for replication req)
           if (client.length() != 0) {
+            mirrorInStatus = mirrorIn.readShort();
             firstBadLink = Text.readString(mirrorIn);
-            if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
               LOG.info("Datanode " + targets.length +
                        " got response for connect ack " +
                        " from downstream datanode with firstbadlink as " +
@@ -305,6 +308,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
 
         } catch (IOException e) {
           if (client.length() != 0) {
+            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
             Text.writeString(replyOut, mirrorNode);
             replyOut.flush();
           }
@@ -327,11 +331,12 @@ class DataXceiver extends DataTransferProtocol.Receiver
 
       // send connect ack back to source (only for clients)
       if (client.length() != 0) {
-        if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
           LOG.info("Datanode " + targets.length +
                    " forwarding connect ack to upstream firstbadlink is " +
                    firstBadLink);
         }
+        replyOut.writeShort(mirrorInStatus);
         Text.writeString(replyOut, firstBadLink);
         replyOut.flush();
       }

+ 2 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -228,6 +228,7 @@ public class TestDataTransferProtocol extends TestCase {
     
     // bad data chunk length
     sendOut.writeInt(-1-random.nextInt(oneMil));
+    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
@@ -257,6 +258,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
     //ok finally write a block with 0 len
+    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);