Browse Source

HDFS-992. Re-factor block access token implementation to conform to the generic Token interface in Common (Kan Zhang and Jitendra Pandey via jghoman)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@948634 13f79535-47bb-0310-9956-ffa450edef68
Jakob Homan 15 years ago
parent
commit
893e833a67
39 changed files with 1241 additions and 999 deletions
  1. 5 0
      CHANGES.txt
  2. 1 1
      ivy/libraries.properties
  3. 12 10
      src/java/org/apache/hadoop/hdfs/BlockReader.java
  4. 10 4
      src/java/org/apache/hadoop/hdfs/DFSClient.java
  5. 9 8
      src/java/org/apache/hadoop/hdfs/DFSInputStream.java
  6. 11 10
      src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  7. 3 0
      src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
  8. 48 44
      src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
  9. 9 8
      src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
  10. 0 306
      src/java/org/apache/hadoop/hdfs/security/AccessTokenHandler.java
  11. 0 110
      src/java/org/apache/hadoop/hdfs/security/BlockAccessKey.java
  12. 0 89
      src/java/org/apache/hadoop/hdfs/security/BlockAccessToken.java
  13. 37 0
      src/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
  14. 145 0
      src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
  15. 318 0
      src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
  16. 45 0
      src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
  17. 25 51
      src/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
  18. 4 4
      src/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
  19. 29 28
      src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  20. 6 9
      src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  21. 74 35
      src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  22. 85 58
      src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  23. 7 5
      src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
  24. 41 41
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  25. 3 3
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  26. 1 1
      src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  27. 3 3
      src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
  28. 5 5
      src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
  29. 4 4
      src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
  30. 5 3
      src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
  31. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
  32. 11 11
      src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
  33. 0 89
      src/test/hdfs/org/apache/hadoop/hdfs/security/TestAccessToken.java
  34. 8 4
      src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java
  35. 224 0
      src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
  36. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
  37. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  38. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
  39. 47 49
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

+ 5 - 0
CHANGES.txt

@@ -2,6 +2,11 @@ Hadoop HDFS Change Log
 
 Trunk (unreleased changes)
 
+  NEW FEATURES
+
+    HDFS-992. Re-factor block access token implementation to conform to the 
+    generic Token interface in Common (Kan Zhang and Jitendra Pandey via jghoman)
+
   IMPROVEMENTS
 
     HDFS-1132. Refactor TestFileStatus (Eli Collins via cos)

+ 1 - 1
ivy/libraries.properties

@@ -16,7 +16,7 @@
 #These are the versions of our dependencies (in alphabetical order)
 apacheant.version=1.7.1
 ant-task.version=2.0.10
-avro.version=1.3.1
+avro.version=1.3.2
 
 checkstyle.version=4.2
 

+ 12 - 10
src/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -33,11 +33,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 /** This is a wrapper around connection to datadone
@@ -353,26 +354,27 @@ public class BlockReader extends FSInputChecker {
     checksumSize = this.checksum.getChecksumSize();
   }
 
-  public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken, 
-      long genStamp, long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
+  public static BlockReader newBlockReader(Socket sock, String file,
+      long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
+      long startOffset, long len, int bufferSize) throws IOException {
+    return newBlockReader(sock, file, blockId, blockToken, genStamp, startOffset, len, bufferSize,
         true);
   }
 
   /** Java Doc required */
   public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
-                                     BlockAccessToken accessToken,
+                                     Token<BlockTokenIdentifier> blockToken,
                                      long genStamp,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum)
                                      throws IOException {
-    return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
+    return newBlockReader(sock, file, blockId, blockToken, genStamp, startOffset,
                           len, bufferSize, verifyChecksum, "");
   }
 
   public static BlockReader newBlockReader( Socket sock, String file,
                                      long blockId, 
-                                     BlockAccessToken accessToken,
+                                     Token<BlockTokenIdentifier> blockToken,
                                      long genStamp,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
@@ -382,7 +384,7 @@ public class BlockReader extends FSInputChecker {
     DataTransferProtocol.Sender.opReadBlock(
         new DataOutputStream(new BufferedOutputStream(
             NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
-        blockId, genStamp, startOffset, len, clientName, accessToken);
+        blockId, genStamp, startOffset, len, clientName, blockToken);
     
     //
     // Get bytes in block, set streams
@@ -395,7 +397,7 @@ public class BlockReader extends FSInputChecker {
     DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
     if (status != SUCCESS) {
       if (status == ERROR_ACCESS_TOKEN) {
-        throw new InvalidAccessTokenException(
+        throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
                 + sock.getLocalSocketAddress() + ", remote="
                 + sock.getRemoteSocketAddress() + ", for file " + file

+ 10 - 4
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -183,15 +184,20 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         rpcNamenode, methodNameToPolicyMap);
   }
 
-  static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
-      DatanodeID datanodeid, Configuration conf) throws IOException {
+  static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, LocatedBlock locatedBlock)
+      throws IOException {
     InetSocketAddress addr = NetUtils.createSocketAddr(
       datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
       ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
     }
+    UserGroupInformation ticket = UserGroupInformation
+        .createRemoteUser(locatedBlock.getBlock().toString());
+    ticket.addToken(locatedBlock.getBlockToken());
     return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
-        ClientDatanodeProtocol.versionID, addr, conf);
+        ClientDatanodeProtocol.versionID, addr, ticket, conf, NetUtils
+        .getDefaultSocketFactory(conf));
   }
         
   /**
@@ -946,7 +952,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           }
           // get block MD5
           DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
-              block.getGenerationStamp(), lb.getAccessToken());
+              block.getGenerationStamp(), lb.getBlockToken());
 
           final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
           if (reply != SUCCESS) {

+ 9 - 8
src/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -34,10 +34,11 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 
 /****************************************************************
@@ -141,7 +142,7 @@ class DFSInputStream extends FSInputStream {
     for(DatanodeInfo datanode : locatedblock.getLocations()) {
       try {
         final ClientDatanodeProtocol cdp = DFSClient.createClientDatanodeProtocolProxy(
-            datanode, dfsClient.conf);
+            datanode, dfsClient.conf, locatedblock);
         final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
         if (n >= 0) {
           return n;
@@ -353,7 +354,7 @@ class DFSInputStream extends FSInputStream {
         NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
         s.setSoTimeout(dfsClient.socketTimeout);
         Block blk = targetBlock.getBlock();
-        BlockAccessToken accessToken = targetBlock.getAccessToken();
+        Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         
         blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
             accessToken, 
@@ -362,7 +363,7 @@ class DFSInputStream extends FSInputStream {
             buffersize, verifyChecksum, dfsClient.clientName);
         return chosenNode;
       } catch (IOException ex) {
-        if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
+        if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will fetch a new access token and retry, " 
               + "access token was invalid when connecting to " + targetAddr
               + " : " + ex);
@@ -593,13 +594,13 @@ class DFSInputStream extends FSInputStream {
         dn = dfsClient.socketFactory.createSocket();
         NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
         dn.setSoTimeout(dfsClient.socketTimeout);
-        BlockAccessToken accessToken = block.getAccessToken();
+        Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
             
         int len = (int) (end - start + 1);
             
         reader = BlockReader.newBlockReader(dn, src, 
                                             block.getBlock().getBlockId(),
-                                            accessToken,
+                                            blockToken,
                                             block.getBlock().getGenerationStamp(),
                                             start, len, buffersize, 
                                             verifyChecksum, dfsClient.clientName);
@@ -615,7 +616,7 @@ class DFSInputStream extends FSInputStream {
                  e.getPos() + " from " + chosenNode.getName());
         dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
       } catch (IOException e) {
-        if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
+        if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will get a new access token and retry, "
               + "access token was invalid when connecting to " + targetAddr
               + " : " + e);

+ 11 - 10
src/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -54,8 +54,6 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -65,11 +63,14 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
@@ -272,7 +273,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   class DataStreamer extends Daemon {
     private volatile boolean streamerClosed = false;
     private Block block; // its length is number of bytes acked
-    private BlockAccessToken accessToken;
+    private Token<BlockTokenIdentifier> accessToken;
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private ResponseProcessor response = null;
@@ -302,7 +303,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
-      accessToken = lastBlock.getAccessToken();
+      accessToken = lastBlock.getBlockToken();
       long usedInLastBlock = stat.getLen() % blockSize;
       int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -773,7 +774,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
         // get a new generation stamp and an access token
         LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
         newGS = lb.getBlock().getGenerationStamp();
-        accessToken = lb.getAccessToken();
+        accessToken = lb.getBlockToken();
         
         // set up the pipeline again with the remaining nodes
         success = createBlockOutputStream(nodes, newGS, isRecovery);
@@ -813,7 +814,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
         lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
         block = lb.getBlock();
         block.setNumBytes(0);
-        accessToken = lb.getAccessToken();
+        accessToken = lb.getBlockToken();
         nodes = lb.getLocations();
 
         //
@@ -884,7 +885,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
         firstBadLink = Text.readString(blockReplyStream);
         if (pipelineStatus != SUCCESS) {
           if (pipelineStatus == ERROR_ACCESS_TOKEN) {
-            throw new InvalidAccessTokenException(
+            throw new InvalidBlockTokenException(
                 "Got access token error for connect ack with firstBadLink as "
                     + firstBadLink);
           } else {
@@ -977,7 +978,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
       return nodes;
     }
 
-    BlockAccessToken getAccessToken() {
+    Token<BlockTokenIdentifier> getBlockToken() {
       return accessToken;
     }
 
@@ -1455,8 +1456,8 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   /**
    * Returns the access token currently used by streamer, for testing only
    */
-  BlockAccessToken getAccessToken() {
-    return streamer.getAccessToken();
+  Token<BlockTokenIdentifier> getBlockToken() {
+    return streamer.getBlockToken();
   }
 
 }

+ 3 - 0
src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -21,10 +21,13 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
  */
+@TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 

+ 48 - 44
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -24,10 +24,11 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * Transfer data to/from datanode using a streaming protocol.
@@ -221,9 +222,9 @@ public interface DataTransferProtocol {
     }
 
     /** Send OP_READ_BLOCK */
-    public static void opReadBlock(DataOutputStream out,
-        long blockId, long blockGs, long blockOffset, long blockLen,
-        String clientName, BlockAccessToken accessToken) throws IOException {
+    public static void opReadBlock(DataOutputStream out, long blockId,
+        long blockGs, long blockOffset, long blockLen, String clientName,
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.READ_BLOCK);
 
       out.writeLong(blockId);
@@ -231,16 +232,16 @@ public interface DataTransferProtocol {
       out.writeLong(blockOffset);
       out.writeLong(blockLen);
       Text.writeString(out, clientName);
-      accessToken.write(out);
+      blockToken.write(out);
       out.flush();
     }
     
     /** Send OP_WRITE_BLOCK */
-    public static void opWriteBlock(DataOutputStream out,
-        long blockId, long blockGs, int pipelineSize, 
-        BlockConstructionStage stage, long newGs, long minBytesRcvd,
-        long maxBytesRcvd, String client, DatanodeInfo src, 
-        DatanodeInfo[] targets, BlockAccessToken accesstoken) throws IOException {
+    public static void opWriteBlock(DataOutputStream out, long blockId,
+        long blockGs, int pipelineSize, BlockConstructionStage stage,
+        long newGs, long minBytesRcvd, long maxBytesRcvd, String client,
+        DatanodeInfo src, DatanodeInfo[] targets,
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.WRITE_BLOCK);
 
       out.writeLong(blockId);
@@ -261,42 +262,44 @@ public interface DataTransferProtocol {
         targets[i].write(out);
       }
 
-      accesstoken.write(out);
+      blockToken.write(out);
     }
     
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
         long blockId, long blockGs, String storageId, DatanodeInfo src,
-        BlockAccessToken accesstoken) throws IOException {
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
       Text.writeString(out, storageId);
       src.write(out);
-      accesstoken.write(out);
+      blockToken.write(out);
       out.flush();
     }
 
     /** Send OP_COPY_BLOCK */
-    public static void opCopyBlock(DataOutputStream out,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
+    public static void opCopyBlock(DataOutputStream out, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException {
       op(out, Op.COPY_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
-      accesstoken.write(out);
+      blockToken.write(out);
       out.flush();
     }
 
     /** Send OP_BLOCK_CHECKSUM */
-    public static void opBlockChecksum(DataOutputStream out,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
+    public static void opBlockChecksum(DataOutputStream out, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
-      accesstoken.write(out);
+      blockToken.write(out);
       out.flush();
     }
   }
@@ -345,18 +348,17 @@ public interface DataTransferProtocol {
       final long offset = in.readLong();
       final long length = in.readLong();
       final String client = Text.readString(in);
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
+      opReadBlock(in, blockId, blockGs, offset, length, client, blockToken);
     }
 
     /**
-     * Abstract OP_READ_BLOCK method.
-     * Read a block.
+     * Abstract OP_READ_BLOCK method. Read a block.
      */
-    protected abstract void opReadBlock(DataInputStream in,
-        long blockId, long blockGs, long offset, long length,
-        String client, BlockAccessToken accesstoken) throws IOException;
+    protected abstract void opReadBlock(DataInputStream in, long blockId,
+        long blockGs, long offset, long length, String client,
+        Token<BlockTokenIdentifier> blockToken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
@@ -379,10 +381,10 @@ public interface DataTransferProtocol {
       for (int i = 0; i < targets.length; i++) {
         targets[i] = DatanodeInfo.read(in);
       }
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
       opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
-          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
+          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, blockToken);
     }
 
     /**
@@ -394,7 +396,7 @@ public interface DataTransferProtocol {
         int pipelineSize, BlockConstructionStage stage,
         long newGs, long minBytesRcvd, long maxBytesRcvd,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
-        BlockAccessToken accesstoken) throws IOException;
+        Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
@@ -402,9 +404,9 @@ public interface DataTransferProtocol {
       final long blockGs = in.readLong();
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
+      opReplaceBlock(in, blockId, blockGs, sourceId, src, blockToken);
     }
 
     /**
@@ -413,44 +415,46 @@ public interface DataTransferProtocol {
      */
     protected abstract void opReplaceBlock(DataInputStream in,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
-        BlockAccessToken accesstoken) throws IOException;
+        Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opCopyBlock(in, blockId, blockGs, accesstoken);
+      opCopyBlock(in, blockId, blockGs, blockToken);
     }
 
     /**
-     * Abstract OP_COPY_BLOCK method.
-     * It is used for balancing purpose; send to a proxy source.
+     * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
+     * a proxy source.
      */
-    protected abstract void opCopyBlock(DataInputStream in,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
+    protected abstract void opCopyBlock(DataInputStream in, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
-      final BlockAccessToken accesstoken = readAccessToken(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
-      opBlockChecksum(in, blockId, blockGs, accesstoken);
+      opBlockChecksum(in, blockId, blockGs, blockToken);
     }
 
     /**
      * Abstract OP_BLOCK_CHECKSUM method.
      * Get the checksum of a block 
      */
-    protected abstract void opBlockChecksum(DataInputStream in,
-        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
+    protected abstract void opBlockChecksum(DataInputStream in, long blockId,
+        long blockGs, Token<BlockTokenIdentifier> blockToken)
+        throws IOException;
 
     /** Read an AccessToken */
-    static private BlockAccessToken readAccessToken(DataInputStream in
+    static private Token<BlockTokenIdentifier> readBlockToken(DataInputStream in
         ) throws IOException {
-      final BlockAccessToken t = new BlockAccessToken();
+      final Token<BlockTokenIdentifier> t = new Token<BlockTokenIdentifier>();
       t.readFields(in);
       return t; 
     }

+ 9 - 8
src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java

@@ -17,8 +17,9 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.security.token.Token;
 
 import java.io.*;
 
@@ -44,7 +45,7 @@ public class LocatedBlock implements Writable {
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
   private boolean corrupt;
-  private BlockAccessToken accessToken = new BlockAccessToken();
+  private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
 
   /**
    */
@@ -78,12 +79,12 @@ public class LocatedBlock implements Writable {
     }
   }
 
-  public BlockAccessToken getAccessToken() {
-    return accessToken;
+  public Token<BlockTokenIdentifier> getBlockToken() {
+    return blockToken;
   }
 
-  public void setAccessToken(BlockAccessToken token) {
-    this.accessToken = token;
+  public void setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.blockToken = token;
   }
 
   /**
@@ -122,7 +123,7 @@ public class LocatedBlock implements Writable {
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    accessToken.write(out);
+    blockToken.write(out);
     out.writeBoolean(corrupt);
     out.writeLong(offset);
     b.write(out);
@@ -133,7 +134,7 @@ public class LocatedBlock implements Writable {
   }
 
   public void readFields(DataInput in) throws IOException {
-    accessToken.readFields(in);
+    blockToken.readFields(in);
     this.corrupt = in.readBoolean();
     offset = in.readLong();
     this.b = new Block();

+ 0 - 306
src/java/org/apache/hadoop/hdfs/security/AccessTokenHandler.java

@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.security;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.security.GeneralSecurityException;
-import java.security.SecureRandom;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * AccessTokenHandler can be instantiated in 2 modes, master mode and slave
- * mode. Master can generate new access keys and export access keys to slaves,
- * while slaves can only import and use access keys received from master. Both
- * master and slave can generate and verify access tokens. Typically, master
- * mode is used by NN and slave mode is used by DN.
- */
-public class AccessTokenHandler {
-  private static final Log LOG = LogFactory.getLog(AccessTokenHandler.class);
-
-  private final boolean isMaster;
-  /*
-   * keyUpdateInterval is the interval that NN updates its access keys. It
-   * should be set long enough so that all live DN's and Balancer should have
-   * sync'ed their access keys with NN at least once during each interval.
-   */
-  private final long keyUpdateInterval;
-  private long tokenLifetime;
-  private long serialNo = new SecureRandom().nextLong();
-  private KeyGenerator keyGen;
-  private BlockAccessKey currentKey;
-  private BlockAccessKey nextKey;
-  private Map<Long, BlockAccessKey> allKeys;
-
-  public static enum AccessMode {
-    READ, WRITE, COPY, REPLACE
-  };
-
-  /**
-   * Constructor
-   * 
-   * @param isMaster
-   * @param keyUpdateInterval
-   * @param tokenLifetime
-   * @throws IOException
-   */
-  public AccessTokenHandler(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime) throws IOException {
-    this.isMaster = isMaster;
-    this.keyUpdateInterval = keyUpdateInterval;
-    this.tokenLifetime = tokenLifetime;
-    this.allKeys = new HashMap<Long, BlockAccessKey>();
-    if (isMaster) {
-      try {
-        generateKeys();
-        initMac(currentKey);
-      } catch (GeneralSecurityException e) {
-        throw (IOException) new IOException(
-            "Failed to create AccessTokenHandler").initCause(e);
-      }
-    }
-  }
-
-  /** Initialize access keys */
-  private synchronized void generateKeys() throws NoSuchAlgorithmException {
-    keyGen = KeyGenerator.getInstance("HmacSHA1");
-    /*
-     * Need to set estimated expiry dates for currentKey and nextKey so that if
-     * NN crashes, DN can still expire those keys. NN will stop using the newly
-     * generated currentKey after the first keyUpdateInterval, however it may
-     * still be used by DN and Balancer to generate new tokens before they get a
-     * chance to sync their keys with NN. Since we require keyUpdInterval to be
-     * long enough so that all live DN's and Balancer will sync their keys with
-     * NN at least once during the period, the estimated expiry date for
-     * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
-     * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
-     * more.
-     */
-    serialNo++;
-    currentKey = new BlockAccessKey(serialNo, new Text(keyGen.generateKey()
-        .getEncoded()), System.currentTimeMillis() + 2 * keyUpdateInterval
-        + tokenLifetime);
-    serialNo++;
-    nextKey = new BlockAccessKey(serialNo, new Text(keyGen.generateKey()
-        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
-        + tokenLifetime);
-    allKeys.put(currentKey.getKeyID(), currentKey);
-    allKeys.put(nextKey.getKeyID(), nextKey);
-  }
-
-  /** Initialize Mac function */
-  private synchronized void initMac(BlockAccessKey key) throws IOException {
-    try {
-      Mac mac = Mac.getInstance("HmacSHA1");
-      mac.init(new SecretKeySpec(key.getKey().getBytes(), "HmacSHA1"));
-      key.setMac(mac);
-    } catch (GeneralSecurityException e) {
-      throw (IOException) new IOException(
-          "Failed to initialize Mac for access key, keyID=" + key.getKeyID())
-          .initCause(e);
-    }
-  }
-
-  /** Export access keys, only to be used in master mode */
-  public synchronized ExportedAccessKeys exportKeys() {
-    if (!isMaster)
-      return null;
-    if (LOG.isDebugEnabled())
-      LOG.debug("Exporting access keys");
-    return new ExportedAccessKeys(true, keyUpdateInterval, tokenLifetime,
-        currentKey, allKeys.values().toArray(new BlockAccessKey[0]));
-  }
-
-  private synchronized void removeExpiredKeys() {
-    long now = System.currentTimeMillis();
-    for (Iterator<Map.Entry<Long, BlockAccessKey>> it = allKeys.entrySet()
-        .iterator(); it.hasNext();) {
-      Map.Entry<Long, BlockAccessKey> e = it.next();
-      if (e.getValue().getExpiryDate() < now) {
-        it.remove();
-      }
-    }
-  }
-
-  /**
-   * Set access keys, only to be used in slave mode
-   */
-  public synchronized void setKeys(ExportedAccessKeys exportedKeys)
-      throws IOException {
-    if (isMaster || exportedKeys == null)
-      return;
-    LOG.info("Setting access keys");
-    removeExpiredKeys();
-    this.currentKey = exportedKeys.getCurrentKey();
-    initMac(currentKey);
-    BlockAccessKey[] receivedKeys = exportedKeys.getAllKeys();
-    for (int i = 0; i < receivedKeys.length; i++) {
-      if (receivedKeys[i] == null)
-        continue;
-      this.allKeys.put(receivedKeys[i].getKeyID(), receivedKeys[i]);
-    }
-  }
-
-  /**
-   * Update access keys, only to be used in master mode
-   */
-  public synchronized void updateKeys() throws IOException {
-    if (!isMaster)
-      return;
-    LOG.info("Updating access keys");
-    removeExpiredKeys();
-    // set final expiry date of retiring currentKey
-    allKeys.put(currentKey.getKeyID(), new BlockAccessKey(currentKey.getKeyID(),
-        currentKey.getKey(), System.currentTimeMillis() + keyUpdateInterval
-            + tokenLifetime));
-    // update the estimated expiry date of new currentKey
-    currentKey = new BlockAccessKey(nextKey.getKeyID(), nextKey.getKey(), System
-        .currentTimeMillis()
-        + 2 * keyUpdateInterval + tokenLifetime);
-    initMac(currentKey);
-    allKeys.put(currentKey.getKeyID(), currentKey);
-    // generate a new nextKey
-    serialNo++;
-    nextKey = new BlockAccessKey(serialNo, new Text(keyGen.generateKey()
-        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
-        + tokenLifetime);
-    allKeys.put(nextKey.getKeyID(), nextKey);
-  }
-
-  /** Check if token is well formed */
-  private synchronized boolean verifyToken(long keyID, BlockAccessToken token)
-      throws IOException {
-    BlockAccessKey key = allKeys.get(keyID);
-    if (key == null) {
-      LOG.warn("Access key for keyID=" + keyID + " doesn't exist.");
-      return false;
-    }
-    if (key.getMac() == null) {
-      initMac(key);
-    }
-    Text tokenID = token.getTokenID();
-    Text authenticator = new Text(key.getMac().doFinal(tokenID.getBytes()));
-    return authenticator.equals(token.getTokenAuthenticator());
-  }
-
-  /** Generate an access token for current user */
-  public BlockAccessToken generateToken(long blockID, EnumSet<AccessMode> modes)
-      throws IOException {
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    String userID = (ugi == null ? null : ugi.getShortUserName());
-    return generateToken(userID, blockID, modes);
-  }
-
-  /** Generate an access token for a specified user */
-  public synchronized BlockAccessToken generateToken(String userID, long blockID,
-      EnumSet<AccessMode> modes) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generating access token for user=" + userID + ", blockID="
-          + blockID + ", access modes=" + modes + ", keyID="
-          + currentKey.getKeyID());
-    }
-    if (modes == null || modes.isEmpty())
-      throw new IOException("access modes can't be null or empty");
-    ByteArrayOutputStream buf = new ByteArrayOutputStream(4096);
-    DataOutputStream out = new DataOutputStream(buf);
-    WritableUtils.writeVLong(out, System.currentTimeMillis() + tokenLifetime);
-    WritableUtils.writeVLong(out, currentKey.getKeyID());
-    WritableUtils.writeString(out, userID);
-    WritableUtils.writeVLong(out, blockID);
-    WritableUtils.writeVInt(out, modes.size());
-    for (AccessMode aMode : modes) {
-      WritableUtils.writeEnum(out, aMode);
-    }
-    Text tokenID = new Text(buf.toByteArray());
-    return new BlockAccessToken(tokenID, new Text(currentKey.getMac().doFinal(
-        tokenID.getBytes())));
-  }
-
-  /** Check if access should be allowed. userID is not checked if null */
-  public boolean checkAccess(BlockAccessToken token, String userID, long blockID,
-      AccessMode mode) throws IOException {
-    long oExpiry = 0;
-    long oKeyID = 0;
-    String oUserID = null;
-    long oBlockID = 0;
-    EnumSet<AccessMode> oModes = EnumSet.noneOf(AccessMode.class);
-
-    try {
-      ByteArrayInputStream buf = new ByteArrayInputStream(token.getTokenID()
-          .getBytes());
-      DataInputStream in = new DataInputStream(buf);
-      oExpiry = WritableUtils.readVLong(in);
-      oKeyID = WritableUtils.readVLong(in);
-      oUserID = WritableUtils.readString(in);
-      oBlockID = WritableUtils.readVLong(in);
-      int length = WritableUtils.readVInt(in);
-      for (int i = 0; i < length; ++i) {
-        oModes.add(WritableUtils.readEnum(in, AccessMode.class));
-      }
-    } catch (IOException e) {
-      throw (IOException) new IOException(
-          "Unable to parse access token for user=" + userID + ", blockID="
-              + blockID + ", access mode=" + mode).initCause(e);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Verifying access token for user=" + userID + ", blockID="
-          + blockID + ", access mode=" + mode + ", keyID=" + oKeyID);
-    }
-    return (userID == null || userID.equals(oUserID)) && oBlockID == blockID
-        && !isExpired(oExpiry) && oModes.contains(mode)
-        && verifyToken(oKeyID, token);
-  }
-
-  private static boolean isExpired(long expiryDate) {
-    return System.currentTimeMillis() > expiryDate;
-  }
-
-  /** check if a token is expired. for unit test only.
-   *  return true when token is expired, false otherwise */
-  static boolean isTokenExpired(BlockAccessToken token) throws IOException {
-    ByteArrayInputStream buf = new ByteArrayInputStream(token.getTokenID()
-        .getBytes());
-    DataInputStream in = new DataInputStream(buf);
-    long expiryDate = WritableUtils.readVLong(in);
-    return isExpired(expiryDate);
-  }
-
-  /** set token lifetime. for unit test only */
-  synchronized void setTokenLifetime(long tokenLifetime) {
-    this.tokenLifetime = tokenLifetime;
-  }
-}

+ 0 - 110
src/java/org/apache/hadoop/hdfs/security/BlockAccessKey.java

@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.security;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import javax.crypto.Mac;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * Key used for generating and verifying access tokens
- */
-public class BlockAccessKey implements Writable {
-  private long keyID;
-  private Text key;
-  private long expiryDate;
-  private transient Mac mac;
-
-  public BlockAccessKey() {
-    this(0L, new Text(), 0L);
-  }
-
-  public BlockAccessKey(long keyID, Text key, long expiryDate) {
-    this.keyID = keyID;
-    this.key = key;
-    this.expiryDate = expiryDate;
-  }
-
-  public long getKeyID() {
-    return keyID;
-  }
-
-  public Text getKey() {
-    return key;
-  }
-
-  public long getExpiryDate() {
-    return expiryDate;
-  }
-
-  public Mac getMac() {
-    return mac;
-  }
-
-  public void setMac(Mac mac) {
-    this.mac = mac;
-  }
-
-  static boolean isEqual(Object a, Object b) {
-    return a == null ? b == null : a.equals(b);
-  }
-
-  /** {@inheritDoc} */
-  public boolean equals(Object obj) {
-    if (obj == this) {
-      return true;
-    }
-    if (obj instanceof BlockAccessKey) {
-      BlockAccessKey that = (BlockAccessKey) obj;
-      return this.keyID == that.keyID && isEqual(this.key, that.key)
-          && this.expiryDate == that.expiryDate;
-    }
-    return false;
-  }
-
-  /** {@inheritDoc} */
-  public int hashCode() {
-    return key == null ? 0 : key.hashCode();
-  }
-
-  // ///////////////////////////////////////////////
-  // Writable
-  // ///////////////////////////////////////////////
-  /**
-   */
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVLong(out, keyID);
-    key.write(out);
-    WritableUtils.writeVLong(out, expiryDate);
-  }
-
-  /**
-   */
-  public void readFields(DataInput in) throws IOException {
-    keyID = WritableUtils.readVLong(in);
-    key.readFields(in);
-    expiryDate = WritableUtils.readVLong(in);
-  }
-}

+ 0 - 89
src/java/org/apache/hadoop/hdfs/security/BlockAccessToken.java

@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.security;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class BlockAccessToken implements Writable {
-  public static final BlockAccessToken DUMMY_TOKEN = new BlockAccessToken();
-  private Text tokenID;
-  private Text tokenAuthenticator;
-
-  public BlockAccessToken() {
-    this(new Text(), new Text());
-  }
-
-  public BlockAccessToken(Text tokenID, Text tokenAuthenticator) {
-    this.tokenID = tokenID;
-    this.tokenAuthenticator = tokenAuthenticator;
-  }
-
-  public Text getTokenID() {
-    return tokenID;
-  }
-
-  public Text getTokenAuthenticator() {
-    return tokenAuthenticator;
-  }
-
-  static boolean isEqual(Object a, Object b) {
-    return a == null ? b == null : a.equals(b);
-  }
-
-  /** {@inheritDoc} */
-  public boolean equals(Object obj) {
-    if (obj == this) {
-      return true;
-    }
-    if (obj instanceof BlockAccessToken) {
-      BlockAccessToken that = (BlockAccessToken) obj;
-      return isEqual(this.tokenID, that.tokenID)
-          && isEqual(this.tokenAuthenticator, that.tokenAuthenticator);
-    }
-    return false;
-  }
-
-  /** {@inheritDoc} */
-  public int hashCode() {
-    return tokenAuthenticator == null ? 0 : tokenAuthenticator.hashCode();
-  }
-
-  // ///////////////////////////////////////////////
-  // Writable
-  // ///////////////////////////////////////////////
-  /**
-   */
-  public void write(DataOutput out) throws IOException {
-    tokenID.write(out);
-    tokenAuthenticator.write(out);
-  }
-
-  /**
-   */
-  public void readFields(DataInput in) throws IOException {
-    tokenID.readFields(in);
-    tokenAuthenticator.readFields(in);
-  }
-
-}

+ 37 - 0
src/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+/**
+ * Key used for generating and verifying block tokens
+ */
+public class BlockKey extends DelegationKey {
+
+  public BlockKey() {
+    super();
+  }
+
+  public BlockKey(int keyId, long expiryDate, SecretKey key) {
+    super(keyId, expiryDate, key);
+  }
+}

+ 145 - 0
src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java

@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+public class BlockTokenIdentifier extends TokenIdentifier {
+  static final Text KIND_NAME = new Text("HDFS_BLOCK_TOKEN");
+
+  private long expiryDate;
+  private int keyId;
+  private String userId;
+  private long blockId;
+  private EnumSet<AccessMode> modes;
+
+  public BlockTokenIdentifier() {
+    this(null, 0, EnumSet.noneOf(AccessMode.class));
+  }
+
+  public BlockTokenIdentifier(String userId, long blockId,
+      EnumSet<AccessMode> modes) {
+    this.userId = userId;
+    this.blockId = blockId;
+    this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+  }
+
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+
+  @Override
+  public UserGroupInformation getUser() {
+    if (userId == null || "".equals(userId)) {
+      return UserGroupInformation.createRemoteUser(Long.toString(blockId));
+    }
+    return UserGroupInformation.createRemoteUser(userId);
+  }
+
+  public long getExpiryDate() {
+    return expiryDate;
+  }
+
+  public void setExpiryDate(long expiryDate) {
+    this.expiryDate = expiryDate;
+  }
+
+  public int getKeyId() {
+    return this.keyId;
+  }
+
+  public void setKeyId(int keyId) {
+    this.keyId = keyId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public long getBlockId() {
+    return blockId;
+  }
+
+  public EnumSet<AccessMode> getAccessModes() {
+    return modes;
+  }
+
+  public String toString() {
+    return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+        + ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+        + ", blockId=" + this.getBlockId() + ", access modes="
+        + this.getAccessModes() + ")";
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof BlockTokenIdentifier) {
+      BlockTokenIdentifier that = (BlockTokenIdentifier) obj;
+      return this.expiryDate == that.expiryDate && this.keyId == that.keyId
+          && isEqual(this.userId, that.userId) && this.blockId == that.blockId
+          && isEqual(this.modes, that.modes);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
+        ^ (userId == null ? 0 : userId.hashCode());
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    expiryDate = WritableUtils.readVLong(in);
+    keyId = WritableUtils.readVInt(in);
+    userId = WritableUtils.readString(in);
+    blockId = WritableUtils.readVLong(in);
+    int length = WritableUtils.readVInt(in);
+    for (int i = 0; i < length; i++) {
+      modes.add(WritableUtils.readEnum(in, AccessMode.class));
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVLong(out, expiryDate);
+    WritableUtils.writeVInt(out, keyId);
+    WritableUtils.writeString(out, userId);
+    WritableUtils.writeVLong(out, blockId);
+    WritableUtils.writeVInt(out, modes.size());
+    for (AccessMode aMode : modes) {
+      WritableUtils.writeEnum(out, aMode);
+    }
+  }
+}

+ 318 - 0
src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java

@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new block keys and export block keys to slaves,
+ * while slaves can only import and use block keys received from master. Both
+ * master and slave can generate and verify block tokens. Typically, master mode
+ * is used by NN and slave mode is used by DN.
+ */
+public class BlockTokenSecretManager extends
+    SecretManager<BlockTokenIdentifier> {
+  public static final Log LOG = LogFactory
+      .getLog(BlockTokenSecretManager.class);
+  public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
+
+  private final boolean isMaster;
+  /*
+   * keyUpdateInterval is the interval that NN updates its block keys. It should
+   * be set long enough so that all live DN's and Balancer should have sync'ed
+   * their block keys with NN at least once during each interval.
+   */
+  private final long keyUpdateInterval;
+  private volatile long tokenLifetime;
+  private int serialNo = new SecureRandom().nextInt();
+  private BlockKey currentKey;
+  private BlockKey nextKey;
+  private Map<Integer, BlockKey> allKeys;
+
+  public static enum AccessMode {
+    READ, WRITE, COPY, REPLACE
+  };
+
+  /**
+   * Constructor
+   * 
+   * @param isMaster
+   * @param keyUpdateInterval
+   * @param tokenLifetime
+   * @throws IOException
+   */
+  public BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
+      long tokenLifetime) throws IOException {
+    this.isMaster = isMaster;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.allKeys = new HashMap<Integer, BlockKey>();
+    generateKeys();
+  }
+
+  /** Initialize block keys */
+  private synchronized void generateKeys() {
+    if (!isMaster)
+      return;
+    /*
+     * Need to set estimated expiry dates for currentKey and nextKey so that if
+     * NN crashes, DN can still expire those keys. NN will stop using the newly
+     * generated currentKey after the first keyUpdateInterval, however it may
+     * still be used by DN and Balancer to generate new tokens before they get a
+     * chance to sync their keys with NN. Since we require keyUpdInterval to be
+     * long enough so that all live DN's and Balancer will sync their keys with
+     * NN at least once during the period, the estimated expiry date for
+     * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+     * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+     * more.
+     */
+    serialNo++;
+    currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    serialNo++;
+    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    allKeys.put(currentKey.getKeyId(), currentKey);
+    allKeys.put(nextKey.getKeyId(), nextKey);
+  }
+
+  /** Export block keys, only to be used in master mode */
+  public synchronized ExportedBlockKeys exportKeys() {
+    if (!isMaster)
+      return null;
+    if (LOG.isDebugEnabled())
+      LOG.debug("Exporting access keys");
+    return new ExportedBlockKeys(true, keyUpdateInterval, tokenLifetime,
+        currentKey, allKeys.values().toArray(new BlockKey[0]));
+  }
+
+  private synchronized void removeExpiredKeys() {
+    long now = System.currentTimeMillis();
+    for (Iterator<Map.Entry<Integer, BlockKey>> it = allKeys.entrySet()
+        .iterator(); it.hasNext();) {
+      Map.Entry<Integer, BlockKey> e = it.next();
+      if (e.getValue().getExpiryDate() < now) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Set block keys, only to be used in slave mode
+   */
+  public synchronized void setKeys(ExportedBlockKeys exportedKeys)
+      throws IOException {
+    if (isMaster || exportedKeys == null)
+      return;
+    LOG.info("Setting block keys");
+    removeExpiredKeys();
+    this.currentKey = exportedKeys.getCurrentKey();
+    BlockKey[] receivedKeys = exportedKeys.getAllKeys();
+    for (int i = 0; i < receivedKeys.length; i++) {
+      if (receivedKeys[i] == null)
+        continue;
+      this.allKeys.put(receivedKeys[i].getKeyId(), receivedKeys[i]);
+    }
+  }
+
+  /**
+   * Update block keys, only to be used in master mode
+   */
+  public synchronized void updateKeys() throws IOException {
+    if (!isMaster)
+      return;
+    LOG.info("Updating block keys");
+    removeExpiredKeys();
+    // set final expiry date of retiring currentKey
+    allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
+        System.currentTimeMillis() + keyUpdateInterval + tokenLifetime,
+        currentKey.getKey()));
+    // update the estimated expiry date of new currentKey
+    currentKey = new BlockKey(nextKey.getKeyId(), System.currentTimeMillis()
+        + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
+    allKeys.put(currentKey.getKeyId(), currentKey);
+    // generate a new nextKey
+    serialNo++;
+    nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
+        * keyUpdateInterval + tokenLifetime, generateSecret());
+    allKeys.put(nextKey.getKeyId(), nextKey);
+  }
+
+  /** Generate an block token for current user */
+  public Token<BlockTokenIdentifier> generateToken(Block block,
+      EnumSet<AccessMode> modes) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String userID = (ugi == null ? null : ugi.getShortUserName());
+    return generateToken(userID, block, modes);
+  }
+
+  /** Generate a block token for a specified user */
+  public Token<BlockTokenIdentifier> generateToken(String userId, Block block,
+      EnumSet<AccessMode> modes) throws IOException {
+    BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
+        .getBlockId(), modes);
+    return new Token<BlockTokenIdentifier>(id, this);
+  }
+
+  /**
+   * Check if access should be allowed. userID is not checked if null. This
+   * method doesn't check if token password is correct. It should be used only
+   * when token password has already been verified (e.g., in the RPC layer).
+   */
+  public void checkAccess(BlockTokenIdentifier id, String userId, Block block,
+      AccessMode mode) throws InvalidToken {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Checking access for user=" + userId + ", block=" + block
+          + ", access mode=" + mode + " using " + id.toString());
+    }
+    if (userId != null && !userId.equals(id.getUserId())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't belong to user " + userId);
+    }
+    if (id.getBlockId() != block.getBlockId()) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't apply to block " + block);
+    }
+    if (isExpired(id.getExpiryDate())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " is expired.");
+    }
+    if (!id.getAccessModes().contains(mode)) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't have " + mode + " permission");
+    }
+  }
+
+  /** Check if access should be allowed. userID is not checked if null */
+  public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
+      Block block, AccessMode mode) throws InvalidToken {
+    BlockTokenIdentifier id = new BlockTokenIdentifier();
+    try {
+      id.readFields(new DataInputStream(new ByteArrayInputStream(token
+          .getIdentifier())));
+    } catch (IOException e) {
+      throw new InvalidToken(
+          "Unable to de-serialize block token identifier for user=" + userId
+              + ", block=" + block + ", access mode=" + mode);
+    }
+    checkAccess(id, userId, block, mode);
+    if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
+      throw new InvalidToken("Block token with " + id.toString()
+          + " doesn't have the correct token password");
+    }
+  }
+
+  private static boolean isExpired(long expiryDate) {
+    return System.currentTimeMillis() > expiryDate;
+  }
+
+  /**
+   * check if a token is expired. for unit test only. return true when token is
+   * expired, false otherwise
+   */
+  static boolean isTokenExpired(Token<BlockTokenIdentifier> token)
+      throws IOException {
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    long expiryDate = WritableUtils.readVLong(in);
+    return isExpired(expiryDate);
+  }
+
+  /** set token lifetime. */
+  public void setTokenLifetime(long tokenLifetime) {
+    this.tokenLifetime = tokenLifetime;
+  }
+
+  /**
+   * Create an empty block token identifier
+   * 
+   * @return a newly created empty block token identifier
+   */
+  @Override
+  public BlockTokenIdentifier createIdentifier() {
+    return new BlockTokenIdentifier();
+  }
+
+  /**
+   * Create a new password/secret for the given block token identifier.
+   * 
+   * @param identifier
+   *          the block token identifier
+   * @return token password/secret
+   */
+  @Override
+  protected byte[] createPassword(BlockTokenIdentifier identifier) {
+    BlockKey key = null;
+    synchronized (this) {
+      key = currentKey;
+    }
+    if (key == null)
+      throw new IllegalStateException("currentKey hasn't been initialized.");
+    identifier.setExpiryDate(System.currentTimeMillis() + tokenLifetime);
+    identifier.setKeyId(key.getKeyId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generating block token for " + identifier.toString());
+    }
+    return createPassword(identifier.getBytes(), key.getKey());
+  }
+
+  /**
+   * Look up the token password/secret for the given block token identifier.
+   * 
+   * @param identifier
+   *          the block token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(BlockTokenIdentifier identifier)
+      throws InvalidToken {
+    if (isExpired(identifier.getExpiryDate())) {
+      throw new InvalidToken("Block token with " + identifier.toString()
+          + " is expired.");
+    }
+    BlockKey key = null;
+    synchronized (this) {
+      key = allKeys.get(identifier.getKeyId());
+    }
+    if (key == null) {
+      throw new InvalidToken("Can't re-compute password for "
+          + identifier.toString() + ", since the required block key (keyID="
+          + identifier.getKeyId() + ") doesn't exist.");
+    }
+    return createPassword(identifier.getBytes(), key.getKey());
+  }
+}

+ 45 - 0
src/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * A block token selector for HDFS
+ */
+public class BlockTokenSelector implements TokenSelector<BlockTokenIdentifier> {
+
+  @SuppressWarnings("unchecked")
+  public Token<BlockTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (BlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+        return (Token<BlockTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

+ 25 - 51
src/java/org/apache/hadoop/hdfs/security/ExportedAccessKeys.java → src/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java

@@ -16,43 +16,42 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdfs.security;
+package org.apache.hadoop.hdfs.security.token.block;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Arrays;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
 /**
- * Object for passing access keys
+ * Object for passing block keys
  */
-public class ExportedAccessKeys implements Writable {
-  public static final ExportedAccessKeys DUMMY_KEYS = new ExportedAccessKeys();
-  private boolean isAccessTokenEnabled;
+public class ExportedBlockKeys implements Writable {
+  public static final ExportedBlockKeys DUMMY_KEYS = new ExportedBlockKeys();
+  private boolean isBlockTokenEnabled;
   private long keyUpdateInterval;
   private long tokenLifetime;
-  private BlockAccessKey currentKey;
-  private BlockAccessKey[] allKeys;
+  private BlockKey currentKey;
+  private BlockKey[] allKeys;
 
-  public ExportedAccessKeys() {
-    this(false, 0, 0, new BlockAccessKey(), new BlockAccessKey[0]);
+  public ExportedBlockKeys() {
+    this(false, 0, 0, new BlockKey(), new BlockKey[0]);
   }
 
-  ExportedAccessKeys(boolean isAccessTokenEnabled, long keyUpdateInterval,
-      long tokenLifetime, BlockAccessKey currentKey, BlockAccessKey[] allKeys) {
-    this.isAccessTokenEnabled = isAccessTokenEnabled;
+  ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+      long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
+    this.isBlockTokenEnabled = isBlockTokenEnabled;
     this.keyUpdateInterval = keyUpdateInterval;
     this.tokenLifetime = tokenLifetime;
-    this.currentKey = currentKey;
-    this.allKeys = allKeys;
+    this.currentKey = currentKey == null ? new BlockKey() : currentKey;
+    this.allKeys = allKeys == null ? new BlockKey[0] : allKeys;
   }
 
-  public boolean isAccessTokenEnabled() {
-    return isAccessTokenEnabled;
+  public boolean isBlockTokenEnabled() {
+    return isBlockTokenEnabled;
   }
 
   public long getKeyUpdateInterval() {
@@ -63,47 +62,22 @@ public class ExportedAccessKeys implements Writable {
     return tokenLifetime;
   }
 
-  public BlockAccessKey getCurrentKey() {
+  public BlockKey getCurrentKey() {
     return currentKey;
   }
 
-  public BlockAccessKey[] getAllKeys() {
+  public BlockKey[] getAllKeys() {
     return allKeys;
   }
-
-  static boolean isEqual(Object a, Object b) {
-    return a == null ? b == null : a.equals(b);
-  }
-
-  /** {@inheritDoc} */
-  public boolean equals(Object obj) {
-    if (obj == this) {
-      return true;
-    }
-    if (obj instanceof ExportedAccessKeys) {
-      ExportedAccessKeys that = (ExportedAccessKeys) obj;
-      return this.isAccessTokenEnabled == that.isAccessTokenEnabled
-          && this.keyUpdateInterval == that.keyUpdateInterval
-          && this.tokenLifetime == that.tokenLifetime
-          && isEqual(this.currentKey, that.currentKey)
-          && Arrays.equals(this.allKeys, that.allKeys);
-    }
-    return false;
-  }
-
-  /** {@inheritDoc} */
-  public int hashCode() {
-    return currentKey == null ? 0 : currentKey.hashCode();
-  }
-
+  
   // ///////////////////////////////////////////////
   // Writable
   // ///////////////////////////////////////////////
   static { // register a ctor
-    WritableFactories.setFactory(ExportedAccessKeys.class,
+    WritableFactories.setFactory(ExportedBlockKeys.class,
         new WritableFactory() {
           public Writable newInstance() {
-            return new ExportedAccessKeys();
+            return new ExportedBlockKeys();
           }
         });
   }
@@ -111,7 +85,7 @@ public class ExportedAccessKeys implements Writable {
   /**
    */
   public void write(DataOutput out) throws IOException {
-    out.writeBoolean(isAccessTokenEnabled);
+    out.writeBoolean(isBlockTokenEnabled);
     out.writeLong(keyUpdateInterval);
     out.writeLong(tokenLifetime);
     currentKey.write(out);
@@ -124,13 +98,13 @@ public class ExportedAccessKeys implements Writable {
   /**
    */
   public void readFields(DataInput in) throws IOException {
-    isAccessTokenEnabled = in.readBoolean();
+    isBlockTokenEnabled = in.readBoolean();
     keyUpdateInterval = in.readLong();
     tokenLifetime = in.readLong();
     currentKey.readFields(in);
-    this.allKeys = new BlockAccessKey[in.readInt()];
+    this.allKeys = new BlockKey[in.readInt()];
     for (int i = 0; i < allKeys.length; i++) {
-      allKeys[i] = new BlockAccessKey();
+      allKeys[i] = new BlockKey();
       allKeys[i].readFields(in);
     }
   }

+ 4 - 4
src/java/org/apache/hadoop/hdfs/security/InvalidAccessTokenException.java → src/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java

@@ -16,21 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdfs.security;
+package org.apache.hadoop.hdfs.security.token.block;
 
 import java.io.IOException;
 
 /**
  * Access token verification failed.
  */
-public class InvalidAccessTokenException extends IOException {
+public class InvalidBlockTokenException extends IOException {
   private static final long serialVersionUID = 168L;
 
-  public InvalidAccessTokenException() {
+  public InvalidBlockTokenException() {
     super();
   }
 
-  public InvalidAccessTokenException(String msg) {
+  public InvalidBlockTokenException(String msg) {
     super(msg);
   }
 }

+ 29 - 28
src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -61,9 +61,9 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -81,6 +81,7 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -199,10 +200,10 @@ public class Balancer implements Tool {
   private NamenodeProtocol namenode;
   private ClientProtocol client;
   private FileSystem fs;
-  private boolean isAccessTokenEnabled;
+  private boolean isBlockTokenEnabled;
   private boolean shouldRun;
   private long keyUpdaterInterval;
-  private AccessTokenHandler accessTokenHandler;
+  private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
   private final static Random rnd = new Random();
   
@@ -367,11 +368,11 @@ public class Balancer implements Tool {
     
     /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
-      BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
-      if (isAccessTokenEnabled) {
-        accessToken = accessTokenHandler.generateToken(null, block.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
-            AccessTokenHandler.AccessMode.COPY));
+      Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+      if (isBlockTokenEnabled) {
+        accessToken = blockTokenSecretManager.generateToken(null, block
+            .getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
+            BlockTokenSecretManager.AccessMode.COPY));
       }
       DataTransferProtocol.Sender.opReplaceBlock(out,
           block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(),
@@ -860,25 +861,25 @@ public class Balancer implements Tool {
     this.namenode = createNamenode(conf);
     this.client = DFSClient.createNamenode(conf);
     this.fs = FileSystem.get(conf);
-    ExportedAccessKeys keys = namenode.getAccessKeys();
-    this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
-    if (isAccessTokenEnabled) {
-      long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
-      long accessTokenLifetime = keys.getTokenLifetime();
-      LOG.info("Access token params received from NN: keyUpdateInterval="
-          + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-          + accessTokenLifetime / (60 * 1000) + " min(s)");
-      this.accessTokenHandler = new AccessTokenHandler(false,
-          accessKeyUpdateInterval, accessTokenLifetime);
-      this.accessTokenHandler.setKeys(keys);
+    ExportedBlockKeys keys = namenode.getBlockKeys();
+    this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+    if (isBlockTokenEnabled) {
+      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+      long blockTokenLifetime = keys.getTokenLifetime();
+      LOG.info("Block token params received from NN: keyUpdateInterval="
+          + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+          + blockTokenLifetime / (60 * 1000) + " min(s)");
+      this.blockTokenSecretManager = new BlockTokenSecretManager(false,
+          blockKeyUpdateInterval, blockTokenLifetime);
+      this.blockTokenSecretManager.setKeys(keys);
       /*
-       * Balancer should sync its access keys with NN more frequently than NN
-       * updates its access keys
+       * Balancer should sync its block keys with NN more frequently than NN
+       * updates its block keys
        */
-      this.keyUpdaterInterval = accessKeyUpdateInterval / 4;
-      LOG.info("Balancer will update its access keys every "
+      this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
+      LOG.info("Balancer will update its block keys every "
           + keyUpdaterInterval / (60 * 1000) + " minute(s)");
-      this.keyupdaterthread = new Daemon(new AccessKeyUpdater());
+      this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
       this.shouldRun = true;
       this.keyupdaterthread.start();
     }
@@ -887,12 +888,12 @@ public class Balancer implements Tool {
   /**
    * Periodically updates access keys.
    */
-  class AccessKeyUpdater implements Runnable {
+  class BlockKeyUpdater implements Runnable {
 
     public void run() {
       while (shouldRun) {
         try {
-          accessTokenHandler.setKeys(namenode.getAccessKeys());
+          blockTokenSecretManager.setKeys(namenode.getBlockKeys());
         } catch (Exception e) {
           LOG.error(StringUtils.stringifyException(e));
         }

+ 6 - 9
src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
@@ -107,13 +107,10 @@ public class JspHelper {
     return chosenNode;
   }
 
-  public static void streamBlockInAscii(InetSocketAddress addr, long blockId, 
-                                 BlockAccessToken accessToken, long genStamp, 
-                                 long blockSize, 
-                                 long offsetIntoBlock, long chunkSizeToView, 
-                                 JspWriter out,
-                                 Configuration conf) 
-    throws IOException {
+  public static void streamBlockInAscii(InetSocketAddress addr, long blockId,
+      Token<BlockTokenIdentifier> blockToken, long genStamp, long blockSize,
+      long offsetIntoBlock, long chunkSizeToView, JspWriter out,
+      Configuration conf) throws IOException {
     if (chunkSizeToView == 0) return;
     Socket s = new Socket();
     s.connect(addr, HdfsConstants.READ_TIMEOUT);
@@ -124,7 +121,7 @@ public class JspHelper {
       // Use the block name for file name. 
       BlockReader blockReader = 
         BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
-                                             blockId, accessToken, genStamp ,offsetIntoBlock, 
+                                             blockId, blockToken, genStamp ,offsetIntoBlock, 
                                              amtToRead, 
                                              conf.getInt("io.file.buffer.size",
                                                          4096));

+ 74 - 35
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -31,6 +33,7 @@ import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.AbstractList;
 import java.util.ArrayList;
@@ -40,6 +43,7 @@ import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -62,9 +66,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -91,6 +95,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -203,9 +209,9 @@ public class DataNode extends Configured
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
-  boolean isAccessTokenEnabled;
-  AccessTokenHandler accessTokenHandler;
-  boolean isAccessTokenInitialized = false;
+  boolean isBlockTokenEnabled;
+  BlockTokenSecretManager blockTokenSecretManager;
+  boolean isBlockTokenInitialized = false;
   
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
@@ -416,13 +422,17 @@ public class DataNode extends Configured
       ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
        }
 
+    // BlockTokenSecretManager is created here, but it shouldn't be
+    // used until it is initialized in register().
+    this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+    
     //init ipc server
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.ipc.address"));
-    ipcServer = RPC.getServer(DataNode.class, this,
-        ipcAddr.getHostName(), ipcAddr.getPort(), 
-        conf.getInt("dfs.datanode.handler.count", 3), false, conf);
-    ipcServer.start();
+    ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
+        ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false,
+        conf, blockTokenSecretManager);
+    
     dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
 
     LOG.info("dnRegistration = " + dnRegistration);
@@ -492,14 +502,25 @@ public class DataNode extends Configured
   } 
 
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
-      DatanodeID datanodeid, Configuration conf) throws IOException {
-    InetSocketAddress addr = NetUtils.createSocketAddr(
+      DatanodeID datanodeid, final Configuration conf) throws IOException {
+    final InetSocketAddress addr = NetUtils.createSocketAddr(
         datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
       InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
     }
-    return (InterDatanodeProtocol)RPC.getProxy(InterDatanodeProtocol.class,
-        InterDatanodeProtocol.versionID, addr, conf);
+    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    try {
+      return loginUgi
+          .doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
+            public InterDatanodeProtocol run() throws IOException {
+              return (InterDatanodeProtocol) RPC.getProxy(
+                  InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
+                  addr, conf);
+            }
+          });
+    } catch (InterruptedException ie) {
+      throw new IOException(ie.getMessage());
+    }
   }
 
   public InetSocketAddress getNameNodeAddr() {
@@ -601,25 +622,24 @@ public class DataNode extends Configured
           + ". Expecting " + storage.getStorageID());
     }
     
-    if (!isAccessTokenInitialized) {
+    if (!isBlockTokenInitialized) {
       /* first time registering with NN */
-      ExportedAccessKeys keys = dnRegistration.exportedKeys;
-      this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
-      if (isAccessTokenEnabled) {
-        long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
-        long accessTokenLifetime = keys.getTokenLifetime();
-        LOG.info("Access token params received from NN: keyUpdateInterval="
-            + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-            + accessTokenLifetime / (60 * 1000) + " min(s)");
-        this.accessTokenHandler = new AccessTokenHandler(false,
-            accessKeyUpdateInterval, accessTokenLifetime);
+      ExportedBlockKeys keys = dnRegistration.exportedKeys;
+      this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+      if (isBlockTokenEnabled) {
+        long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+        long blockTokenLifetime = keys.getTokenLifetime();
+        LOG.info("Block token params received from NN: keyUpdateInterval="
+            + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+            + blockTokenLifetime / (60 * 1000) + " min(s)");
+        blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
       }
-      isAccessTokenInitialized = true;
+      isBlockTokenInitialized = true;
     }
 
-    if (isAccessTokenEnabled) {
-      accessTokenHandler.setKeys(dnRegistration.exportedKeys);
-      dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
+    if (isBlockTokenEnabled) {
+      blockTokenSecretManager.setKeys(dnRegistration.exportedKeys);
+      dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
     }
 
     // random short delay - helps scatter the BR from all DNs
@@ -941,8 +961,8 @@ public class DataNode extends Configured
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
-      if (isAccessTokenEnabled) {
-        accessTokenHandler.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+      if (isBlockTokenEnabled) {
+        blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
       }
       break;
     default:
@@ -1268,10 +1288,10 @@ public class DataNode extends Configured
         //
         // Header info
         //
-        BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
-        if (isAccessTokenEnabled) {
-          accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
-              EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
+        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+        if (isBlockTokenEnabled) {
+          accessToken = blockTokenSecretManager.generateToken(null, b,
+          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
         DataTransferProtocol.Sender.opWriteBlock(out,
             b.getBlockId(), b.getGenerationStamp(), 0, 
@@ -1325,6 +1345,7 @@ public class DataNode extends Configured
 
     // start dataXceiveServer
     dataXceiverServer.start();
+    ipcServer.start();
         
     while (shouldRun) {
       try {
@@ -1798,6 +1819,24 @@ public class DataNode extends Configured
   /** {@inheritDoc} */
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final Block block) throws IOException {
+    if (isBlockTokenEnabled) {
+      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+          .getTokenIdentifiers();
+      if (tokenIds.size() != 1) {
+        throw new IOException("Can't continue with getReplicaVisibleLength() "
+            + "authorization since none or more than one BlockTokenIdentifier "
+            + "is found.");
+      }
+      for (TokenIdentifier tokenId : tokenIds) {
+        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got: " + id.toString());
+        }
+        blockTokenSecretManager.checkAccess(id, null, block,
+            BlockTokenSecretManager.AccessMode.WRITE);
+      }
+    }
+
     return data.getReplicaVisibleLength(block);
   }
 }

+ 85 - 58
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -39,8 +39,8 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
@@ -49,6 +49,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
@@ -126,27 +128,33 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Read a block from the disk.
    */
   @Override
-  protected void opReadBlock(DataInputStream in,
-      long blockId, long blockGs, long startOffset, long length,
-      String clientName, BlockAccessToken accessToken) throws IOException {
+  protected void opReadBlock(DataInputStream in, long blockId, long blockGs,
+      long startOffset, long length, String clientName,
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-    
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.READ)) {
+
+    if (datanode.isBlockTokenEnabled) {
       try {
-        ERROR_ACCESS_TOKEN.write(out);
-        out.flush();
-        throw new IOException("Access token verification failed, for client "
-            + remoteAddress + " for OP_READ_BLOCK for block " + block);
-      } finally {
-        IOUtils.closeStream(out);
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          ERROR_ACCESS_TOKEN.write(out);
+          out.flush();
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_READ_BLOCK for block " + block + " : "
+              + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(out);
+        }
       }
     }
+  
     // send the block
     BlockSender blockSender = null;
     final String clientTraceFmt =
@@ -212,7 +220,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
       int pipelineSize, BlockConstructionStage stage,
       long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
-      BlockAccessToken accessToken) throws IOException {
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
@@ -228,19 +236,24 @@ class DataXceiver extends DataTransferProtocol.Receiver
     DataOutputStream replyOut = null;   // stream to prev target
     replyOut = new DataOutputStream(
                    NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
-            .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+    if (datanode.isBlockTokenEnabled) {
       try {
-        if (client.length() != 0) {
-          ERROR_ACCESS_TOKEN.write(replyOut);
-          Text.writeString(replyOut, datanode.dnRegistration.getName());
-          replyOut.flush();
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.WRITE);
+      } catch (InvalidToken e) {
+        try {
+          if (client.length() != 0) {
+            ERROR_ACCESS_TOKEN.write(replyOut);
+            Text.writeString(replyOut, datanode.dnRegistration.getName());
+            replyOut.flush();
+          }
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_WRITE_BLOCK for block " + block
+              + " : " + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(replyOut);
         }
-        throw new IOException("Access token verification failed, for client "
-            + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
-      } finally {
-        IOUtils.closeStream(replyOut);
       }
     }
 
@@ -292,7 +305,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
           DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
               blockId, blockGs, 
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, 
-              srcDataNode, targets, accessToken);
+              srcDataNode, targets, blockToken);
 
           if (blockReceiver != null) { // send checksum header
             blockReceiver.writeChecksumHeader(mirrorOut);
@@ -395,22 +408,27 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Get block checksum (MD5 of CRC32).
    */
   @Override
-  protected void opBlockChecksum(DataInputStream in,
-      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
+  protected void opBlockChecksum(DataInputStream in, long blockId,
+      long blockGs, Token<BlockTokenIdentifier> blockToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
-            .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+    if (datanode.isBlockTokenEnabled) {
       try {
-        ERROR_ACCESS_TOKEN.write(out);
-        out.flush();
-        throw new IOException(
-            "Access token verification failed, for client " + remoteAddress
-                + " for OP_BLOCK_CHECKSUM for block " + block);
-      } finally {
-        IOUtils.closeStream(out);
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          ERROR_ACCESS_TOKEN.write(out);
+          out.flush();
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block
+              + " : " + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(out);
+        }
+
       }
     }
 
@@ -454,17 +472,22 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Read a block from the disk and then sends it to a destination.
    */
   @Override
-  protected void opCopyBlock(DataInputStream in,
-      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
+  protected void opCopyBlock(DataInputStream in, long blockId, long blockGs,
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
     // Read in the header
     Block block = new Block(blockId, 0, blockGs);
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.COPY)) {
-      LOG.warn("Invalid access token in request from "
-          + remoteAddress + " for OP_COPY_BLOCK for block " + block);
-      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
-      return;
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.COPY);
+      } catch (InvalidToken e) {
+        LOG.warn("Invalid access token in request from " + remoteAddress
+            + " for OP_COPY_BLOCK for block " + block + " : "
+            + e.getLocalizedMessage());
+        sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+        return;
+      }
+
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -526,17 +549,21 @@ class DataXceiver extends DataTransferProtocol.Receiver
   @Override
   protected void opReplaceBlock(DataInputStream in,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
-      BlockAccessToken accessToken) throws IOException {
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
     /* read header */
     final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
         blockGs);
-    if (datanode.isAccessTokenEnabled
-        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
-            AccessTokenHandler.AccessMode.REPLACE)) {
-      LOG.warn("Invalid access token in request from "
-          + remoteAddress + " for OP_REPLACE_BLOCK for block " + block);
-      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
-      return;
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.REPLACE);
+      } catch (InvalidToken e) {
+        LOG.warn("Invalid access token in request from " + remoteAddress
+            + " for OP_REPLACE_BLOCK for block " + block + " : "
+            + e.getLocalizedMessage());
+        sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+        return;
+      }
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
@@ -567,7 +594,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
 
       /* send request to the proxy */
       DataTransferProtocol.Sender.opCopyBlock(proxyOut, block.getBlockId(),
-          block.getGenerationStamp(), accessToken);
+          block.getGenerationStamp(), blockToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(

+ 7 - 5
src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java

@@ -39,10 +39,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 
 @InterfaceAudience.Private
@@ -379,7 +381,7 @@ public class DatanodeJspHelper {
 
     final DFSClient dfs = getDFSClient(ugi, datanode.getNameNodeAddr(), conf);
 
-    BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
+    Token<BlockTokenIdentifier> blockToken = BlockTokenSecretManager.DUMMY_TOKEN;
     if (conf.getBoolean(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT)) {
@@ -392,7 +394,7 @@ public class DatanodeJspHelper {
       }
       for (int i = 0; i < blks.size(); i++) {
         if (blks.get(i).getBlock().getBlockId() == blockId) {
-          accessToken = blks.get(i).getAccessToken();
+          blockToken = blks.get(i).getBlockToken();
           break;
         }
       }
@@ -558,7 +560,7 @@ public class DatanodeJspHelper {
     out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
     try {
       JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
-          datanodePort), blockId, accessToken, genStamp, blockSize,
+          datanodePort), blockId, blockToken, genStamp, blockSize,
           startOffset, chunkSizeToView, out, conf);
     } catch (Exception e) {
       out.print(e);
@@ -627,7 +629,7 @@ public class DatanodeJspHelper {
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
-    BlockAccessToken accessToken = lastBlk.getAccessToken();
+    Token<BlockTokenIdentifier> accessToken = lastBlk.getBlockToken();
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     try {

+ 41 - 41
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -23,8 +23,8 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -164,10 +164,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   private FSNamesystemMetrics myFSMetrics;
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
   private int totalLoad = 0;
-  boolean isAccessTokenEnabled;
-  AccessTokenHandler accessTokenHandler;
-  private long accessKeyUpdateInterval;
-  private long accessTokenLifetime;
+  boolean isBlockTokenEnabled;
+  BlockTokenSecretManager blockTokenSecretManager;
+  private long blockKeyUpdateInterval;
+  private long blockTokenLifetime;
   
   // Scan interval is not configurable.
   private final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 3600000; // 1 hour
@@ -301,9 +301,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     this.safeMode = new SafeModeInfo(conf);
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                         conf.get("dfs.hosts.exclude",""));
-    if (isAccessTokenEnabled) {
-      accessTokenHandler = new AccessTokenHandler(true,
-          accessKeyUpdateInterval, accessTokenLifetime);
+    if (isBlockTokenEnabled) {
+      blockTokenSecretManager = new BlockTokenSecretManager(true,
+          blockKeyUpdateInterval, blockTokenLifetime);
     }
   }
 
@@ -455,20 +455,20 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
     this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
                                       DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
-    this.isAccessTokenEnabled = conf.getBoolean(
+    this.isBlockTokenEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
-    if (isAccessTokenEnabled) {
-      this.accessKeyUpdateInterval = conf.getLong(
+    if (isBlockTokenEnabled) {
+      this.blockKeyUpdateInterval = conf.getLong(
           DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, 
           DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
-      this.accessTokenLifetime = conf.getLong(
+      this.blockTokenLifetime = conf.getLong(
           DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 
           DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
     }
-    LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
-        + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
-        + " min(s), accessTokenLifetime=" + accessTokenLifetime / (60 * 1000)
+    LOG.info("isBlockTokenEnabled=" + isBlockTokenEnabled
+        + " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
+        + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
   }
 
@@ -632,9 +632,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * 
    * @return current access keys
    */
-  ExportedAccessKeys getAccessKeys() {
-    return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
-        : ExportedAccessKeys.DUMMY_KEYS;
+  ExportedBlockKeys getBlockKeys() {
+    return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
+        : ExportedBlockKeys.DUMMY_KEYS;
   }
 
   /**
@@ -802,9 +802,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
       final long offset, final boolean corrupt) throws IOException {
     final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
-    if (isAccessTokenEnabled) {
-      lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
-          EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+    if (isBlockTokenEnabled) {
+      lb.setBlockToken(blockTokenSecretManager.generateToken(b,
+          EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
     }
     return lb;
   }
@@ -1364,9 +1364,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
 
           lb = new LocatedBlock(lastBlock, targets, 
                                 fileLength-lastBlock.getNumBytes());
-          if (isAccessTokenEnabled) {
-            lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
-                .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+          if (isBlockTokenEnabled) {
+            lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
+                EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
           }
 
           // Remove block from replication queue.
@@ -1482,9 +1482,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
         
     // Create next block
     LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
-    if (isAccessTokenEnabled) {
-      b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
-          .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    if (isBlockTokenEnabled) {
+      b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
     return b;
   }
@@ -2310,7 +2310,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = getAccessKeys();
+    nodeReg.exportedKeys = getBlockKeys();
       
     NameNode.stateChangeLog.info(
                                  "BLOCK* NameSystem.registerDatanode: "
@@ -2523,8 +2523,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
           cmds.add(cmd);
         }
         // check access key update
-        if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
-          cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
+        if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
+          cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
           nodeinfo.needKeyUpdate = false;
         }
         if (!cmds.isEmpty()) {
@@ -2562,8 +2562,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   /**
    * Update access keys.
    */
-  void updateAccessKey() throws IOException {
-    this.accessTokenHandler.updateKeys();
+  void updateBlockKey() throws IOException {
+    this.blockTokenSecretManager.updateKeys();
     synchronized (heartbeats) {
       for (DatanodeDescriptor nodeInfo : heartbeats) {
         nodeInfo.needKeyUpdate = true;
@@ -2572,11 +2572,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   }
 
   /**
-   * Periodically calls heartbeatCheck() and updateAccessKey()
+   * Periodically calls heartbeatCheck() and updateBlockKey()
    */
   class HeartbeatMonitor implements Runnable {
     private long lastHeartbeatCheck;
-    private long lastAccessKeyUpdate;
+    private long lastBlockKeyUpdate;
     /**
      */
     public void run() {
@@ -2587,9 +2587,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
             heartbeatCheck();
             lastHeartbeatCheck = now;
           }
-          if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
-            updateAccessKey();
-            lastAccessKeyUpdate = now;
+          if (isBlockTokenEnabled && (lastBlockKeyUpdate + blockKeyUpdateInterval < now)) {
+            updateBlockKey();
+            lastBlockKeyUpdate = now;
           }
         } catch (Exception e) {
           FSNamesystem.LOG.error(StringUtils.stringifyException(e));
@@ -4229,9 +4229,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     // get a new generation stamp and an access token
     block.setGenerationStamp(nextGenerationStamp());
     LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
-    if (isAccessTokenEnabled) {
-      locatedBlock.setAccessToken(accessTokenHandler.generateToken(
-          block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    if (isBlockTokenEnabled) {
+      locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
+          block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
     }
     return locatedBlock;
   }

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -54,7 +54,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -498,8 +498,8 @@ public class NameNode implements NamenodeProtocols, FSConstants {
   }
 
   /** {@inheritDoc} */
-  public ExportedAccessKeys getAccessKeys() throws IOException {
-    return namesystem.getAccessKeys();
+  public ExportedBlockKeys getBlockKeys() throws IOException {
+    return namesystem.getBlockKeys();
   }
 
   @Override // NamenodeProtocol

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -501,7 +501,7 @@ public class NamenodeFsck {
           BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
                                                block.getBlockId(), 
                                                block.getBlockId(), 
-                                               lblock.getAccessToken(),
+                                               lblock.getBlockToken(),
                                                block.getGenerationStamp(), 
                                                0, -1,
                                                conf.getInt("io.file.buffer.size", 4096));

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java

@@ -23,7 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -47,7 +47,7 @@ implements Writable, NodeRegistration {
   }
 
   public StorageInfo storageInfo;
-  public ExportedAccessKeys exportedKeys;
+  public ExportedBlockKeys exportedKeys;
 
   /**
    * Default constructor.
@@ -62,7 +62,7 @@ implements Writable, NodeRegistration {
   public DatanodeRegistration(String nodeName) {
     super(nodeName);
     this.storageInfo = new StorageInfo();
-    this.exportedKeys = new ExportedAccessKeys();
+    this.exportedKeys = new ExportedBlockKeys();
   }
   
   public void setInfoPort(int infoPort) {

+ 5 - 5
src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java

@@ -21,24 +21,24 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
 public class KeyUpdateCommand extends DatanodeCommand {
-  private ExportedAccessKeys keys;
+  private ExportedBlockKeys keys;
 
   KeyUpdateCommand() {
-    this(new ExportedAccessKeys());
+    this(new ExportedBlockKeys());
   }
 
-  public KeyUpdateCommand(ExportedAccessKeys keys) {
+  public KeyUpdateCommand(ExportedBlockKeys keys) {
     super(DatanodeProtocol.DNA_ACCESSKEYUPDATE);
     this.keys = keys;
   }
 
-  public ExportedAccessKeys getExportedKeys() {
+  public ExportedBlockKeys getExportedKeys() {
     return this.keys;
   }
 

+ 4 - 4
src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java

@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.KerberosInfo;
@@ -74,12 +74,12 @@ public interface NamenodeProtocol extends VersionedProtocol {
   throws IOException;
 
   /**
-   * Get the current access keys
+   * Get the current block keys
    * 
-   * @return ExportedAccessKeys containing current access keys
+   * @return ExportedBlockKeys containing current block keys
    * @throws IOException 
    */
-  public ExportedAccessKeys getAccessKeys() throws IOException;
+  public ExportedBlockKeys getBlockKeys() throws IOException;
 
   /**
    * Get the size of the current edit log (in bytes).

+ 5 - 3
src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -42,9 +42,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /** Utilities for HDFS tests */
@@ -262,8 +263,9 @@ public class DFSTestUtil {
     return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
   }
 
-  public static BlockAccessToken getAccessToken(FSDataOutputStream out) {
-    return ((DFSOutputStream) out.getWrappedStream()).getAccessToken();
+  public static Token<BlockTokenIdentifier> getBlockToken(
+      FSDataOutputStream out) {
+    return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();
   }
 
   static void setLogLevel2All(org.apache.commons.logging.Log log) {

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -92,7 +92,7 @@ public class TestClientBlockVerification {
 
     return BlockReader.newBlockReader(
       s, targetAddr.toString()+ ":" + block.getBlockId(), block.getBlockId(),
-      testBlock.getAccessToken(), block.getGenerationStamp(),
+      testBlock.getBlockToken(), block.getGenerationStamp(),
       offset, lenToRead,
       conf.getInt("io.file.buffer.size", 4096));
   }

+ 11 - 11
src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -49,7 +49,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
@@ -170,7 +170,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         block.getBlockId(), block.getGenerationStamp(), 0,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     if (eofExcepted) {
       ERROR.write(recvOut);
       sendRecvData(description, true);
@@ -356,7 +356,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     // bad bytes per checksum
@@ -370,7 +370,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
         ++newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);
     sendOut.writeInt(4);           // size of packet
@@ -392,7 +392,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         ++newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
     sendOut.writeInt(8);           // size of packet
@@ -423,7 +423,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(fileLen);
     ERROR.write(recvOut);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset
@@ -435,7 +435,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(-1L);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
@@ -448,7 +448,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -463,7 +463,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(0);
     sendOut.writeLong(-1-random.nextInt(oneMil));
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -478,7 +478,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen + 1);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -491,7 +491,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(sendOut);
     readFile(fileSys, file, fileLen);
     } finally {
       cluster.shutdown();

+ 0 - 89
src/test/hdfs/org/apache/hadoop/hdfs/security/TestAccessToken.java

@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.security;
-
-import java.util.EnumSet;
-
-import org.apache.hadoop.io.TestWritable;
-
-import junit.framework.TestCase;
-
-/** Unit tests for access tokens */
-public class TestAccessToken extends TestCase {
-  long accessKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
-  long accessTokenLifetime = 2 * 60 * 1000; // 2 mins
-  long blockID1 = 0L;
-  long blockID2 = 10L;
-  long blockID3 = -108L;
-
-  /** test Writable */
-  public void testWritable() throws Exception {
-    TestWritable.testWritable(ExportedAccessKeys.DUMMY_KEYS);
-    AccessTokenHandler handler = new AccessTokenHandler(true,
-        accessKeyUpdateInterval, accessTokenLifetime);
-    ExportedAccessKeys keys = handler.exportKeys();
-    TestWritable.testWritable(keys);
-    TestWritable.testWritable(BlockAccessToken.DUMMY_TOKEN);
-    BlockAccessToken token = handler.generateToken(blockID3, EnumSet
-        .allOf(AccessTokenHandler.AccessMode.class));
-    TestWritable.testWritable(token);
-  }
-
-  private void tokenGenerationAndVerification(AccessTokenHandler master,
-      AccessTokenHandler slave) throws Exception {
-    // single-mode tokens
-    for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
-        .values()) {
-      // generated by master
-      BlockAccessToken token1 = master.generateToken(blockID1, EnumSet.of(mode));
-      assertTrue(master.checkAccess(token1, null, blockID1, mode));
-      assertTrue(slave.checkAccess(token1, null, blockID1, mode));
-      // generated by slave
-      BlockAccessToken token2 = slave.generateToken(blockID2, EnumSet.of(mode));
-      assertTrue(master.checkAccess(token2, null, blockID2, mode));
-      assertTrue(slave.checkAccess(token2, null, blockID2, mode));
-    }
-    // multi-mode tokens
-    BlockAccessToken mtoken = master.generateToken(blockID3, EnumSet
-        .allOf(AccessTokenHandler.AccessMode.class));
-    for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
-        .values()) {
-      assertTrue(master.checkAccess(mtoken, null, blockID3, mode));
-      assertTrue(slave.checkAccess(mtoken, null, blockID3, mode));
-    }
-  }
-
-  /** test access key and token handling */
-  public void testAccessTokenHandler() throws Exception {
-    AccessTokenHandler masterHandler = new AccessTokenHandler(true,
-        accessKeyUpdateInterval, accessTokenLifetime);
-    AccessTokenHandler slaveHandler = new AccessTokenHandler(false,
-        accessKeyUpdateInterval, accessTokenLifetime);
-    ExportedAccessKeys keys = masterHandler.exportKeys();
-    slaveHandler.setKeys(keys);
-    tokenGenerationAndVerification(masterHandler, slaveHandler);
-    // key updating
-    masterHandler.updateKeys();
-    tokenGenerationAndVerification(masterHandler, slaveHandler);
-    keys = masterHandler.exportKeys();
-    slaveHandler.setKeys(keys);
-    tokenGenerationAndVerification(masterHandler, slaveHandler);
-  }
-
-}

+ 8 - 4
src/test/hdfs/org/apache/hadoop/hdfs/security/SecurityTestUtil.java → src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/SecurityTestUtil.java

@@ -16,10 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdfs.security;
+package org.apache.hadoop.hdfs.security.token.block;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
+
 /** Utilities for security tests */
 public class SecurityTestUtil {
 
@@ -27,15 +31,15 @@ public class SecurityTestUtil {
    * check if an access token is expired. return true when token is expired,
    * false otherwise
    */
-  public static boolean isAccessTokenExpired(BlockAccessToken token)
+  public static boolean isBlockTokenExpired(Token<BlockTokenIdentifier> token)
       throws IOException {
-    return AccessTokenHandler.isTokenExpired(token);
+    return BlockTokenSecretManager.isTokenExpired(token);
   }
 
   /**
    * set access token lifetime.
    */
-  public static void setAccessTokenLifetime(AccessTokenHandler handler,
+  public static void setBlockTokenLifetime(BlockTokenSecretManager handler,
       long tokenLifetime) {
     handler.setTokenLifetime(tokenLifetime);
   }

+ 224 - 0
src/test/hdfs/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java

@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.TestWritable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.log4j.Level;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** Unit tests for block tokens */
+public class TestBlockToken {
+  public static final Log LOG = LogFactory.getLog(TestBlockToken.class);
+  private static final String ADDRESS = "0.0.0.0";
+
+  static final String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal";
+  private static Configuration conf;
+  static {
+    conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+  }
+
+  static {
+    ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) Server.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslRpcClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
+  long blockTokenLifetime = 2 * 60 * 1000; // 2 mins
+  Block block1 = new Block(0L);
+  Block block2 = new Block(10L);
+  Block block3 = new Block(-108L);
+
+  private static class getLengthAnswer implements Answer<Long> {
+    BlockTokenSecretManager sm;
+    BlockTokenIdentifier ident;
+
+    public getLengthAnswer(BlockTokenSecretManager sm,
+        BlockTokenIdentifier ident) {
+      this.sm = sm;
+      this.ident = ident;
+    }
+
+    @Override
+    public Long answer(InvocationOnMock invocation) throws IOException {
+      Object args[] = invocation.getArguments();
+      assertEquals(1, args.length);
+      Block block = (Block) args[0];
+      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
+          .getTokenIdentifiers();
+      assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size());
+      long result = 0;
+      for (TokenIdentifier tokenId : tokenIds) {
+        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
+        LOG.info("Got: " + id.toString());
+        assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
+        sm.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.WRITE);
+        result = id.getBlockId();
+      }
+      return result;
+    }
+  }
+
+  private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
+      Block block, EnumSet<BlockTokenSecretManager.AccessMode> accessModes)
+      throws IOException {
+    Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
+    BlockTokenIdentifier id = sm.createIdentifier();
+    id.readFields(new DataInputStream(new ByteArrayInputStream(token
+        .getIdentifier())));
+    return id;
+  }
+
+  @Test
+  public void testWritable() throws Exception {
+    TestWritable.testWritable(new BlockTokenIdentifier());
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    TestWritable.testWritable(generateTokenId(sm, block1, EnumSet
+        .allOf(BlockTokenSecretManager.AccessMode.class)));
+    TestWritable.testWritable(generateTokenId(sm, block2, EnumSet
+        .of(BlockTokenSecretManager.AccessMode.WRITE)));
+    TestWritable.testWritable(generateTokenId(sm, block3, EnumSet
+        .noneOf(BlockTokenSecretManager.AccessMode.class)));
+  }
+
+  private void tokenGenerationAndVerification(BlockTokenSecretManager master,
+      BlockTokenSecretManager slave) throws Exception {
+    // single-mode tokens
+    for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
+        .values()) {
+      // generated by master
+      Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
+          EnumSet.of(mode));
+      master.checkAccess(token1, null, block1, mode);
+      slave.checkAccess(token1, null, block1, mode);
+      // generated by slave
+      Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
+          EnumSet.of(mode));
+      master.checkAccess(token2, null, block2, mode);
+      slave.checkAccess(token2, null, block2, mode);
+    }
+    // multi-mode tokens
+    Token<BlockTokenIdentifier> mtoken = master.generateToken(block3, EnumSet
+        .allOf(BlockTokenSecretManager.AccessMode.class));
+    for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
+        .values()) {
+      master.checkAccess(mtoken, null, block3, mode);
+      slave.checkAccess(mtoken, null, block3, mode);
+    }
+  }
+
+  /** test block key and token handling */
+  @Test
+  public void testBlockTokenSecretManager() throws Exception {
+    BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    ExportedBlockKeys keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    // key updating
+    masterHandler.updateKeys();
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+  }
+
+  @Test
+  public void testBlockTokenRpc() throws Exception {
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(true,
+        blockKeyUpdateInterval, blockTokenLifetime);
+    Token<BlockTokenIdentifier> token = sm.generateToken(block3,
+        EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
+
+    ClientDatanodeProtocol mockDN = mock(ClientDatanodeProtocol.class);
+    when(mockDN.getProtocolVersion(anyString(), anyLong())).thenReturn(
+        ClientDatanodeProtocol.versionID);
+    BlockTokenIdentifier id = sm.createIdentifier();
+    id.readFields(new DataInputStream(new ByteArrayInputStream(token
+        .getIdentifier())));
+    doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength(
+        any(Block.class));
+
+    final Server server = RPC.getServer(ClientDatanodeProtocol.class, mockDN,
+        ADDRESS, 0, 5, true, conf, sm);
+
+    server.start();
+
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    final UserGroupInformation ticket = UserGroupInformation
+        .createRemoteUser(block3.toString());
+    ticket.addToken(token);
+
+    ClientDatanodeProtocol proxy = null;
+    try {
+      proxy = (ClientDatanodeProtocol) RPC.getProxy(
+          ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, addr,
+          ticket, conf, NetUtils.getDefaultSocketFactory(conf));
+      assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
+    } finally {
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+  }
+
+}

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -236,7 +236,7 @@ public class TestBlockReplacement extends TestCase {
     out.writeLong(block.getGenerationStamp());
     Text.writeString(out, source.getStorageID());
     sourceProxy.write(out);
-    BlockAccessToken.DUMMY_TOKEN.write(out);
+    BlockTokenSecretManager.DUMMY_TOKEN.write(out);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -249,7 +249,7 @@ public class TestDataNodeVolumeFailure extends TestCase{
       BlockReader.newBlockReader(s, targetAddr.toString() + ":" + 
           block.getBlockId(), 
           block.getBlockId(), 
-          lblock.getAccessToken(),
+          lblock.getBlockToken(),
           block.getGenerationStamp(), 
           0, -1, 4096);
 

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -121,7 +121,7 @@ public class TestDiskError extends TestCase {
           block.getBlock().getGenerationStamp(), 1, 
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 
           0L, 0L, 0L, "", null, new DatanodeInfo[0], 
-          BlockAccessToken.DUMMY_TOKEN);
+          BlockTokenSecretManager.DUMMY_TOKEN);
 
       // write check header
       out.writeByte( 1 );

+ 47 - 49
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java → src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

@@ -28,16 +28,13 @@ import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
-import org.apache.hadoop.hdfs.security.SecurityTestUtil;
+import org.apache.hadoop.hdfs.security.token.block.*;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.net.NetUtils;
@@ -45,12 +42,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.token.*;
 import org.apache.log4j.Level;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 import junit.framework.TestCase;
 
-public class TestAccessTokenWithDFS extends TestCase {
+public class TestBlockTokenWithDFS extends TestCase {
 
   private static final int BLOCK_SIZE = 1024;
   private static final int FILE_SIZE = 2 * BLOCK_SIZE;
@@ -136,11 +133,11 @@ public class TestAccessTokenWithDFS extends TestCase {
       blockReader = BlockReader.newBlockReader(s, targetAddr
           .toString()
           + ":" + block.getBlockId(), block.getBlockId(), lblock
-          .getAccessToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
+          .getBlockToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
           "io.file.buffer.size", 4096));
 
     } catch (IOException ex) {
-      if (ex instanceof InvalidAccessTokenException) {
+      if (ex instanceof InvalidBlockTokenException) {
         assertFalse("OP_READ_BLOCK: access token is invalid, "
             + "when it is expected to be valid", shouldSucceed);
         return;
@@ -165,10 +162,10 @@ public class TestAccessTokenWithDFS extends TestCase {
 
   // get a conf for testing
   private static Configuration getConf(int numDataNodes) throws IOException {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
+    conf.setLong("dfs.block.size", BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
     conf.setInt("dfs.heartbeat.interval", 1);
     conf.setInt("dfs.replication", numDataNodes);
     conf.setInt("ipc.client.connect.max.retries", 0);
@@ -190,8 +187,8 @@ public class TestAccessTokenWithDFS extends TestCase {
       cluster.waitActive();
       assertEquals(numDataNodes, cluster.getDataNodes().size());
       // set a short token lifetime (1 second)
-      SecurityTestUtil.setAccessTokenLifetime(
-          cluster.getNamesystem().accessTokenHandler, 1000L);
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
       Path fileToAppend = new Path(FILE_TO_APPEND);
       FileSystem fs = cluster.getFileSystem();
 
@@ -204,13 +201,13 @@ public class TestAccessTokenWithDFS extends TestCase {
       stm = fs.append(fileToAppend);
       int mid = rawData.length - 1;
       stm.write(rawData, 1, mid - 1);
-      stm.hflush();
+      stm.sync();
 
       /*
        * wait till token used in stm expires
        */
-      BlockAccessToken token = DFSTestUtil.getAccessToken(stm);
-      while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+      Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
+      while (!SecurityTestUtil.isBlockTokenExpired(token)) {
         try {
           Thread.sleep(10);
         } catch (InterruptedException ignored) {
@@ -246,8 +243,8 @@ public class TestAccessTokenWithDFS extends TestCase {
       cluster.waitActive();
       assertEquals(numDataNodes, cluster.getDataNodes().size());
       // set a short token lifetime (1 second)
-      SecurityTestUtil.setAccessTokenLifetime(
-          cluster.getNamesystem().accessTokenHandler, 1000L);
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
       Path fileToWrite = new Path(FILE_TO_WRITE);
       FileSystem fs = cluster.getFileSystem();
 
@@ -256,13 +253,13 @@ public class TestAccessTokenWithDFS extends TestCase {
       // write a partial block
       int mid = rawData.length - 1;
       stm.write(rawData, 0, mid);
-      stm.hflush();
+      stm.sync();
 
       /*
        * wait till token used in stm expires
        */
-      BlockAccessToken token = DFSTestUtil.getAccessToken(stm);
-      while (!SecurityTestUtil.isAccessTokenExpired(token)) {
+      Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(stm);
+      while (!SecurityTestUtil.isBlockTokenExpired(token)) {
         try {
           Thread.sleep(10);
         } catch (InterruptedException ignored) {
@@ -294,8 +291,8 @@ public class TestAccessTokenWithDFS extends TestCase {
       cluster.waitActive();
       assertEquals(numDataNodes, cluster.getDataNodes().size());
       // set a short token lifetime (1 second) initially
-      SecurityTestUtil.setAccessTokenLifetime(
-          cluster.getNamesystem().accessTokenHandler, 1000L);
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
       Path fileToRead = new Path(FILE_TO_READ);
       FileSystem fs = cluster.getFileSystem();
       createFile(fs, fileToRead);
@@ -320,12 +317,12 @@ public class TestAccessTokenWithDFS extends TestCase {
 
       DFSClient dfsclient = new DFSClient(new InetSocketAddress("localhost",
           cluster.getNameNodePort()), conf);
-      List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
+      List<LocatedBlock> locatedBlocks = cluster.getNameNode().getBlockLocations(
           FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
       LocatedBlock lblock = locatedBlocks.get(0); // first block
-      BlockAccessToken myToken = lblock.getAccessToken();
+      Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
       // verify token is not expired
-      assertFalse(SecurityTestUtil.isAccessTokenExpired(myToken));
+      assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
       // read with valid token, should succeed
       tryRead(conf, lblock, true);
 
@@ -333,7 +330,7 @@ public class TestAccessTokenWithDFS extends TestCase {
        * wait till myToken and all cached tokens in in1, in2 and in3 expire
        */
 
-      while (!SecurityTestUtil.isAccessTokenExpired(myToken)) {
+      while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
         try {
           Thread.sleep(10);
         } catch (InterruptedException ignored) {
@@ -345,33 +342,34 @@ public class TestAccessTokenWithDFS extends TestCase {
        */
 
       // verify token is expired
-      assertTrue(SecurityTestUtil.isAccessTokenExpired(myToken));
+      assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken));
       // read should fail
       tryRead(conf, lblock, false);
       // use a valid new token
-      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
-          .generateToken(lblock.getBlock().getBlockId(), EnumSet
-              .of(AccessTokenHandler.AccessMode.READ)));
+      lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+          .blockTokenSecretManager.generateToken(lblock.getBlock(),
+              EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
       // read should succeed
       tryRead(conf, lblock, true);
       // use a token with wrong blockID
-      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
-          .generateToken(lblock.getBlock().getBlockId() + 1, EnumSet
-              .of(AccessTokenHandler.AccessMode.READ)));
+      Block wrongBlock = new Block(lblock.getBlock().getBlockId() + 1);
+      lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+          .blockTokenSecretManager.generateToken(wrongBlock,
+              EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
       // read should fail
       tryRead(conf, lblock, false);
       // use a token with wrong access modes
-      lblock.setAccessToken(cluster.getNamesystem().accessTokenHandler
-          .generateToken(lblock.getBlock().getBlockId(), EnumSet.of(
-              AccessTokenHandler.AccessMode.WRITE,
-              AccessTokenHandler.AccessMode.COPY,
-              AccessTokenHandler.AccessMode.REPLACE)));
+      lblock.setBlockToken(cluster.getNameNode().getNamesystem()
+          .blockTokenSecretManager.generateToken(lblock.getBlock(), EnumSet.of(
+              BlockTokenSecretManager.AccessMode.WRITE,
+              BlockTokenSecretManager.AccessMode.COPY,
+              BlockTokenSecretManager.AccessMode.REPLACE)));
       // read should fail
       tryRead(conf, lblock, false);
 
       // set a long token lifetime for future tokens
-      SecurityTestUtil.setAccessTokenLifetime(
-          cluster.getNamesystem().accessTokenHandler, 600 * 1000L);
+      SecurityTestUtil.setBlockTokenLifetime(
+          cluster.getNameNode().getNamesystem().blockTokenSecretManager, 600 * 1000L);
 
       /*
        * testing that when cached tokens are expired, DFSClient will re-fetch
@@ -381,7 +379,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       // confirm all tokens cached in in1 are expired by now
       List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
       for (LocatedBlock blk : lblocks) {
-        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
       }
       // verify blockSeekTo() is able to re-fetch token transparently
       in1.seek(0);
@@ -390,7 +388,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       // confirm all tokens cached in in2 are expired by now
       List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
       for (LocatedBlock blk : lblocks2) {
-        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
       }
       // verify blockSeekTo() is able to re-fetch token transparently (testing
       // via another interface method)
@@ -400,7 +398,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       // confirm all tokens cached in in3 are expired by now
       List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
       for (LocatedBlock blk : lblocks3) {
-        assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
       }
       // verify fetchBlockByteRange() is able to re-fetch token transparently
       assertTrue(checkFile2(in3));
@@ -421,7 +419,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       // confirm tokens cached in in1 are still valid
       lblocks = DFSTestUtil.getAllBlocks(in1);
       for (LocatedBlock blk : lblocks) {
-        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
       }
       // verify blockSeekTo() still works (forced to use cached tokens)
       in1.seek(0);
@@ -430,7 +428,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       // confirm tokens cached in in2 are still valid
       lblocks2 = DFSTestUtil.getAllBlocks(in2);
       for (LocatedBlock blk : lblocks2) {
-        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
       }
       // verify blockSeekTo() still works (forced to use cached tokens)
       in2.seekToNewSource(0);
@@ -439,7 +437,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       // confirm tokens cached in in3 are still valid
       lblocks3 = DFSTestUtil.getAllBlocks(in3);
       for (LocatedBlock blk : lblocks3) {
-        assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
+        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
       }
       // verify fetchBlockByteRange() still works (forced to use cached tokens)
       assertTrue(checkFile2(in3));
@@ -527,7 +525,7 @@ public class TestAccessTokenWithDFS extends TestCase {
    * Integration testing of access token, involving NN, DN, and Balancer
    */
   public void testEnd2End() throws Exception {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     new TestBalancer().integrationTest(conf);
   }