Pārlūkot izejas kodu

HDFS-764. Places the Block Access token implementation in hdfs project. Contributed by HDFS-764.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@881531 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 15 gadi atpakaļ
vecāks
revīzija
edc1512477
28 mainītis faili ar 936 papildinājumiem un 95 dzēšanām
  1. 3 0
      CHANGES.txt
  2. 23 0
      src/java/hdfs-default.xml
  3. 10 10
      src/java/org/apache/hadoop/hdfs/DFSClient.java
  4. 6 6
      src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  5. 18 18
      src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
  6. 4 4
      src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
  7. 306 0
      src/java/org/apache/hadoop/hdfs/security/AccessTokenHandler.java
  8. 110 0
      src/java/org/apache/hadoop/hdfs/security/BlockAccessKey.java
  9. 89 0
      src/java/org/apache/hadoop/hdfs/security/BlockAccessToken.java
  10. 138 0
      src/java/org/apache/hadoop/hdfs/security/ExportedAccessKeys.java
  11. 36 0
      src/java/org/apache/hadoop/hdfs/security/InvalidAccessTokenException.java
  12. 4 4
      src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  13. 2 2
      src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  14. 4 4
      src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  15. 7 7
      src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  16. 6 5
      src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
  17. 8 5
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  18. 1 1
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  19. 1 1
      src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
  20. 1 1
      src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
  21. 1 1
      src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
  22. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
  23. 11 11
      src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
  24. 43 0
      src/test/hdfs/org/apache/hadoop/hdfs/security/SecurityTestUtil.java
  25. 89 0
      src/test/hdfs/org/apache/hadoop/hdfs/security/TestAccessToken.java
  26. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
  27. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
  28. 9 9
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java

+ 3 - 0
CHANGES.txt

@@ -364,6 +364,9 @@ Release 0.21.0 - Unreleased
 
 
     HDFS-521. Create new tests for pipeline (cos)
     HDFS-521. Create new tests for pipeline (cos)
 
 
+    HDFS-764. Places the Block Access token implementation in hdfs project.
+    (Kan Zhang via ddas)
+
   BUG FIXES
   BUG FIXES
 
 
     HDFS-76. Better error message to users when commands fail because of 
     HDFS-76. Better error message to users when commands fail because of 

+ 23 - 0
src/java/hdfs-default.xml

@@ -211,6 +211,29 @@ creations/deletions), or "all".</description>
   <description>The name of the group of super-users.</description>
   <description>The name of the group of super-users.</description>
 </property>
 </property>
 
 
+<property>
+  <name>dfs.block.access.token.enable</name>
+  <value>false</value>
+  <description>
+    If "true", access tokens are used as capabilities for accessing datanodes.
+    If "false", no access tokens are checked on accessing datanodes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.block.access.key.update.interval</name>
+  <value>600</value>
+  <description>
+    Interval in minutes at which namenode updates its access keys.
+  </description>
+</property>
+
+<property>
+  <name>dfs.block.access.token.lifetime</name>
+  <value>600</value>
+  <description>The lifetime of access tokens in minutes.</description>
+</property>
+
 <property>
 <property>
   <name>dfs.datanode.data.dir</name>
   <name>dfs.datanode.data.dir</name>
   <value>${hadoop.tmp.dir}/dfs/data</value>
   <value>${hadoop.tmp.dir}/dfs/data</value>

+ 10 - 10
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -87,6 +87,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -106,8 +108,6 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.InvalidAccessTokenException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
@@ -1547,7 +1547,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       checksumSize = this.checksum.getChecksumSize();
       checksumSize = this.checksum.getChecksumSize();
     }
     }
 
 
-    public static BlockReader newBlockReader(Socket sock, String file, long blockId, AccessToken accessToken, 
+    public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken, 
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
       return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
       return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
           true);
           true);
@@ -1555,7 +1555,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
 
     /** Java Doc required */
     /** Java Doc required */
     public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
     public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
-                                       AccessToken accessToken,
+                                       BlockAccessToken accessToken,
                                        long genStamp,
                                        long genStamp,
                                        long startOffset, long len,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum)
                                        int bufferSize, boolean verifyChecksum)
@@ -1566,7 +1566,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
 
     public static BlockReader newBlockReader( Socket sock, String file,
     public static BlockReader newBlockReader( Socket sock, String file,
                                        long blockId, 
                                        long blockId, 
-                                       AccessToken accessToken,
+                                       BlockAccessToken accessToken,
                                        long genStamp,
                                        long genStamp,
                                        long startOffset, long len,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum,
                                        int bufferSize, boolean verifyChecksum,
@@ -1946,7 +1946,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           NetUtils.connect(s, targetAddr, socketTimeout);
           NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
           s.setSoTimeout(socketTimeout);
           Block blk = targetBlock.getBlock();
           Block blk = targetBlock.getBlock();
-          AccessToken accessToken = targetBlock.getAccessToken();
+          BlockAccessToken accessToken = targetBlock.getAccessToken();
           
           
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
               accessToken, 
               accessToken, 
@@ -2174,7 +2174,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           dn = socketFactory.createSocket();
           dn = socketFactory.createSocket();
           NetUtils.connect(dn, targetAddr, socketTimeout);
           NetUtils.connect(dn, targetAddr, socketTimeout);
           dn.setSoTimeout(socketTimeout);
           dn.setSoTimeout(socketTimeout);
-          AccessToken accessToken = block.getAccessToken();
+          BlockAccessToken accessToken = block.getAccessToken();
               
               
           int len = (int) (end - start + 1);
           int len = (int) (end - start + 1);
               
               
@@ -2580,7 +2580,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     class DataStreamer extends Daemon {
     class DataStreamer extends Daemon {
       private volatile boolean streamerClosed = false;
       private volatile boolean streamerClosed = false;
       private Block block; // its length is number of bytes acked
       private Block block; // its length is number of bytes acked
-      private AccessToken accessToken;
+      private BlockAccessToken accessToken;
       private DataOutputStream blockStream;
       private DataOutputStream blockStream;
       private DataInputStream blockReplyStream;
       private DataInputStream blockReplyStream;
       private ResponseProcessor response = null;
       private ResponseProcessor response = null;
@@ -3284,7 +3284,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         return nodes;
         return nodes;
       }
       }
 
 
-      AccessToken getAccessToken() {
+      BlockAccessToken getAccessToken() {
         return accessToken;
         return accessToken;
       }
       }
 
 
@@ -3734,7 +3734,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     /**
     /**
      * Returns the access token currently used by streamer, for testing only
      * Returns the access token currently used by streamer, for testing only
      */
      */
-    AccessToken getAccessToken() {
+    BlockAccessToken getAccessToken() {
       return streamer.getAccessToken();
       return streamer.getAccessToken();
     }
     }
 
 

+ 6 - 6
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -158,12 +158,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
   public static final String  DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
   public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:50020";
   public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:50020";
 
 
-  public static final String  DFS_ACCESS_TOKEN_ENABLE_KEY = "dfs.access.token.enable";
-  public static final boolean DFS_ACCESS_TOKEN_ENABLE_DEFAULT = false;
-  public static final String  DFS_ACCESS_KEY_UPDATE_INTERVAL_KEY = "dfs.access.key.update.interval";
-  public static final int     DFS_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT = 600;
-  public static final String  DFS_ACCESS_TOKEN_LIFETIME_KEY = "dfs.access.token.lifetime";
-  public static final int     DFS_ACCESS_TOKEN_LIFETIME_DEFAULT = 600;
+  public static final String  DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
+  public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;
+  public static final String  DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY = "dfs.block.access.key.update.interval";
+  public static final long    DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT = 600L;
+  public static final String  DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
+  public static final long    DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
 
 
   public static final String  DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
   public static final String  DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
   public static final int     DFS_REPLICATION_MAX_DEFAULT = 512;
   public static final int     DFS_REPLICATION_MAX_DEFAULT = 512;

+ 18 - 18
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -24,9 +24,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
 
 
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.AccessToken;
 
 
 /**
 /**
  * Transfer data to/from datanode using a streaming protocol.
  * Transfer data to/from datanode using a streaming protocol.
@@ -223,7 +223,7 @@ public interface DataTransferProtocol {
     /** Send OP_READ_BLOCK */
     /** Send OP_READ_BLOCK */
     public static void opReadBlock(DataOutputStream out,
     public static void opReadBlock(DataOutputStream out,
         long blockId, long blockGs, long blockOffset, long blockLen,
         long blockId, long blockGs, long blockOffset, long blockLen,
-        String clientName, AccessToken accessToken) throws IOException {
+        String clientName, BlockAccessToken accessToken) throws IOException {
       op(out, Op.READ_BLOCK);
       op(out, Op.READ_BLOCK);
 
 
       out.writeLong(blockId);
       out.writeLong(blockId);
@@ -240,7 +240,7 @@ public interface DataTransferProtocol {
         long blockId, long blockGs, int pipelineSize, 
         long blockId, long blockGs, int pipelineSize, 
         BlockConstructionStage stage, long newGs, long minBytesRcvd,
         BlockConstructionStage stage, long newGs, long minBytesRcvd,
         long maxBytesRcvd, String client, DatanodeInfo src, 
         long maxBytesRcvd, String client, DatanodeInfo src, 
-        DatanodeInfo[] targets, AccessToken accesstoken) throws IOException {
+        DatanodeInfo[] targets, BlockAccessToken accesstoken) throws IOException {
       op(out, Op.WRITE_BLOCK);
       op(out, Op.WRITE_BLOCK);
 
 
       out.writeLong(blockId);
       out.writeLong(blockId);
@@ -267,7 +267,7 @@ public interface DataTransferProtocol {
     /** Send OP_REPLACE_BLOCK */
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
     public static void opReplaceBlock(DataOutputStream out,
         long blockId, long blockGs, String storageId, DatanodeInfo src,
         long blockId, long blockGs, String storageId, DatanodeInfo src,
-        AccessToken accesstoken) throws IOException {
+        BlockAccessToken accesstoken) throws IOException {
       op(out, Op.REPLACE_BLOCK);
       op(out, Op.REPLACE_BLOCK);
 
 
       out.writeLong(blockId);
       out.writeLong(blockId);
@@ -280,7 +280,7 @@ public interface DataTransferProtocol {
 
 
     /** Send OP_COPY_BLOCK */
     /** Send OP_COPY_BLOCK */
     public static void opCopyBlock(DataOutputStream out,
     public static void opCopyBlock(DataOutputStream out,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
       op(out, Op.COPY_BLOCK);
       op(out, Op.COPY_BLOCK);
 
 
       out.writeLong(blockId);
       out.writeLong(blockId);
@@ -291,7 +291,7 @@ public interface DataTransferProtocol {
 
 
     /** Send OP_BLOCK_CHECKSUM */
     /** Send OP_BLOCK_CHECKSUM */
     public static void opBlockChecksum(DataOutputStream out,
     public static void opBlockChecksum(DataOutputStream out,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException {
       op(out, Op.BLOCK_CHECKSUM);
       op(out, Op.BLOCK_CHECKSUM);
 
 
       out.writeLong(blockId);
       out.writeLong(blockId);
@@ -343,7 +343,7 @@ public interface DataTransferProtocol {
       final long offset = in.readLong();
       final long offset = in.readLong();
       final long length = in.readLong();
       final long length = in.readLong();
       final String client = Text.readString(in);
       final String client = Text.readString(in);
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
 
       opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
       opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
     }
     }
@@ -354,7 +354,7 @@ public interface DataTransferProtocol {
      */
      */
     protected abstract void opReadBlock(DataInputStream in,
     protected abstract void opReadBlock(DataInputStream in,
         long blockId, long blockGs, long offset, long length,
         long blockId, long blockGs, long offset, long length,
-        String client, AccessToken accesstoken) throws IOException;
+        String client, BlockAccessToken accesstoken) throws IOException;
     
     
     /** Receive OP_WRITE_BLOCK */
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
     private void opWriteBlock(DataInputStream in) throws IOException {
@@ -377,7 +377,7 @@ public interface DataTransferProtocol {
       for (int i = 0; i < targets.length; i++) {
       for (int i = 0; i < targets.length; i++) {
         targets[i] = DatanodeInfo.read(in);
         targets[i] = DatanodeInfo.read(in);
       }
       }
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
 
       opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
       opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
           newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
           newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
@@ -392,7 +392,7 @@ public interface DataTransferProtocol {
         int pipelineSize, BlockConstructionStage stage,
         int pipelineSize, BlockConstructionStage stage,
         long newGs, long minBytesRcvd, long maxBytesRcvd,
         long newGs, long minBytesRcvd, long maxBytesRcvd,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
-        AccessToken accesstoken) throws IOException;
+        BlockAccessToken accesstoken) throws IOException;
 
 
     /** Receive OP_REPLACE_BLOCK */
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
     private void opReplaceBlock(DataInputStream in) throws IOException {
@@ -400,7 +400,7 @@ public interface DataTransferProtocol {
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
       final String sourceId = Text.readString(in); // read del hint
       final String sourceId = Text.readString(in); // read del hint
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
       final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
 
       opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
       opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
     }
     }
@@ -411,13 +411,13 @@ public interface DataTransferProtocol {
      */
      */
     protected abstract void opReplaceBlock(DataInputStream in,
     protected abstract void opReplaceBlock(DataInputStream in,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
-        AccessToken accesstoken) throws IOException;
+        BlockAccessToken accesstoken) throws IOException;
 
 
     /** Receive OP_COPY_BLOCK */
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
     private void opCopyBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
 
       opCopyBlock(in, blockId, blockGs, accesstoken);
       opCopyBlock(in, blockId, blockGs, accesstoken);
     }
     }
@@ -427,13 +427,13 @@ public interface DataTransferProtocol {
      * It is used for balancing purpose; send to a proxy source.
      * It is used for balancing purpose; send to a proxy source.
      */
      */
     protected abstract void opCopyBlock(DataInputStream in,
     protected abstract void opCopyBlock(DataInputStream in,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
 
 
     /** Receive OP_BLOCK_CHECKSUM */
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
     private void opBlockChecksum(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long blockGs = in.readLong();
-      final AccessToken accesstoken = readAccessToken(in);
+      final BlockAccessToken accesstoken = readAccessToken(in);
 
 
       opBlockChecksum(in, blockId, blockGs, accesstoken);
       opBlockChecksum(in, blockId, blockGs, accesstoken);
     }
     }
@@ -443,12 +443,12 @@ public interface DataTransferProtocol {
      * Get the checksum of a block 
      * Get the checksum of a block 
      */
      */
     protected abstract void opBlockChecksum(DataInputStream in,
     protected abstract void opBlockChecksum(DataInputStream in,
-        long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+        long blockId, long blockGs, BlockAccessToken accesstoken) throws IOException;
 
 
     /** Read an AccessToken */
     /** Read an AccessToken */
-    static private AccessToken readAccessToken(DataInputStream in
+    static private BlockAccessToken readAccessToken(DataInputStream in
         ) throws IOException {
         ) throws IOException {
-      final AccessToken t = new AccessToken();
+      final BlockAccessToken t = new BlockAccessToken();
       t.readFields(in);
       t.readFields(in);
       return t; 
       return t; 
     }
     }

+ 4 - 4
src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java

@@ -17,8 +17,8 @@
  */
  */
 package org.apache.hadoop.hdfs.protocol;
 package org.apache.hadoop.hdfs.protocol;
 
 
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.security.AccessToken;
 
 
 import java.io.*;
 import java.io.*;
 
 
@@ -44,7 +44,7 @@ public class LocatedBlock implements Writable {
   // else false. If block has few corrupt replicas, they are filtered and 
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
   // their locations are not part of this object
   private boolean corrupt;
   private boolean corrupt;
-  private AccessToken accessToken = new AccessToken();
+  private BlockAccessToken accessToken = new BlockAccessToken();
 
 
   /**
   /**
    */
    */
@@ -78,11 +78,11 @@ public class LocatedBlock implements Writable {
     }
     }
   }
   }
 
 
-  public AccessToken getAccessToken() {
+  public BlockAccessToken getAccessToken() {
     return accessToken;
     return accessToken;
   }
   }
 
 
-  public void setAccessToken(AccessToken token) {
+  public void setAccessToken(BlockAccessToken token) {
     this.accessToken = token;
     this.accessToken = token;
   }
   }
 
 

+ 306 - 0
src/java/org/apache/hadoop/hdfs/security/AccessTokenHandler.java

@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * AccessTokenHandler can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new access keys and export access keys to slaves,
+ * while slaves can only import and use access keys received from master. Both
+ * master and slave can generate and verify access tokens. Typically, master
+ * mode is used by NN and slave mode is used by DN.
+ */
+public class AccessTokenHandler {
+  private static final Log LOG = LogFactory.getLog(AccessTokenHandler.class);
+
+  private final boolean isMaster;
+  /*
+   * keyUpdateInterval is the interval that NN updates its access keys. It
+   * should be set long enough so that all live DN's and Balancer should have
+   * sync'ed their access keys with NN at least once during each interval.
+   */
+  private final long keyUpdateInterval;
+  private long tokenLifetime;
+  private long serialNo = new SecureRandom().nextLong();
+  private KeyGenerator keyGen;
+  private BlockAccessKey currentKey;
+  private BlockAccessKey nextKey;
+  private Map<Long, BlockAccessKey> allKeys;
+
+  public static enum AccessMode {
+    READ, WRITE, COPY, REPLACE
+  };
+
+  /**
+   * Constructor
+   * 
+   * @param isMaster
+   * @param keyUpdateInterval
+   * @param tokenLifetime
+   * @throws IOException
+   */
+  public AccessTokenHandler(boolean isMaster, long keyUpdateInterval,
+      long tokenLifetime) throws IOException {
+    this.isMaster = isMaster;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.allKeys = new HashMap<Long, BlockAccessKey>();
+    if (isMaster) {
+      try {
+        generateKeys();
+        initMac(currentKey);
+      } catch (GeneralSecurityException e) {
+        throw (IOException) new IOException(
+            "Failed to create AccessTokenHandler").initCause(e);
+      }
+    }
+  }
+
+  /** Initialize access keys */
+  private synchronized void generateKeys() throws NoSuchAlgorithmException {
+    keyGen = KeyGenerator.getInstance("HmacSHA1");
+    /*
+     * Need to set estimated expiry dates for currentKey and nextKey so that if
+     * NN crashes, DN can still expire those keys. NN will stop using the newly
+     * generated currentKey after the first keyUpdateInterval, however it may
+     * still be used by DN and Balancer to generate new tokens before they get a
+     * chance to sync their keys with NN. Since we require keyUpdInterval to be
+     * long enough so that all live DN's and Balancer will sync their keys with
+     * NN at least once during the period, the estimated expiry date for
+     * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+     * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+     * more.
+     */
+    serialNo++;
+    currentKey = new BlockAccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 2 * keyUpdateInterval
+        + tokenLifetime);
+    serialNo++;
+    nextKey = new BlockAccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+        + tokenLifetime);
+    allKeys.put(currentKey.getKeyID(), currentKey);
+    allKeys.put(nextKey.getKeyID(), nextKey);
+  }
+
+  /** Initialize Mac function */
+  private synchronized void initMac(BlockAccessKey key) throws IOException {
+    try {
+      Mac mac = Mac.getInstance("HmacSHA1");
+      mac.init(new SecretKeySpec(key.getKey().getBytes(), "HmacSHA1"));
+      key.setMac(mac);
+    } catch (GeneralSecurityException e) {
+      throw (IOException) new IOException(
+          "Failed to initialize Mac for access key, keyID=" + key.getKeyID())
+          .initCause(e);
+    }
+  }
+
+  /** Export access keys, only to be used in master mode */
+  public synchronized ExportedAccessKeys exportKeys() {
+    if (!isMaster)
+      return null;
+    if (LOG.isDebugEnabled())
+      LOG.debug("Exporting access keys");
+    return new ExportedAccessKeys(true, keyUpdateInterval, tokenLifetime,
+        currentKey, allKeys.values().toArray(new BlockAccessKey[0]));
+  }
+
+  private synchronized void removeExpiredKeys() {
+    long now = System.currentTimeMillis();
+    for (Iterator<Map.Entry<Long, BlockAccessKey>> it = allKeys.entrySet()
+        .iterator(); it.hasNext();) {
+      Map.Entry<Long, BlockAccessKey> e = it.next();
+      if (e.getValue().getExpiryDate() < now) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Set access keys, only to be used in slave mode
+   */
+  public synchronized void setKeys(ExportedAccessKeys exportedKeys)
+      throws IOException {
+    if (isMaster || exportedKeys == null)
+      return;
+    LOG.info("Setting access keys");
+    removeExpiredKeys();
+    this.currentKey = exportedKeys.getCurrentKey();
+    initMac(currentKey);
+    BlockAccessKey[] receivedKeys = exportedKeys.getAllKeys();
+    for (int i = 0; i < receivedKeys.length; i++) {
+      if (receivedKeys[i] == null)
+        continue;
+      this.allKeys.put(receivedKeys[i].getKeyID(), receivedKeys[i]);
+    }
+  }
+
+  /**
+   * Update access keys, only to be used in master mode
+   */
+  public synchronized void updateKeys() throws IOException {
+    if (!isMaster)
+      return;
+    LOG.info("Updating access keys");
+    removeExpiredKeys();
+    // set final expiry date of retiring currentKey
+    allKeys.put(currentKey.getKeyID(), new BlockAccessKey(currentKey.getKeyID(),
+        currentKey.getKey(), System.currentTimeMillis() + keyUpdateInterval
+            + tokenLifetime));
+    // update the estimated expiry date of new currentKey
+    currentKey = new BlockAccessKey(nextKey.getKeyID(), nextKey.getKey(), System
+        .currentTimeMillis()
+        + 2 * keyUpdateInterval + tokenLifetime);
+    initMac(currentKey);
+    allKeys.put(currentKey.getKeyID(), currentKey);
+    // generate a new nextKey
+    serialNo++;
+    nextKey = new BlockAccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+        + tokenLifetime);
+    allKeys.put(nextKey.getKeyID(), nextKey);
+  }
+
+  /** Check if token is well formed */
+  private synchronized boolean verifyToken(long keyID, BlockAccessToken token)
+      throws IOException {
+    BlockAccessKey key = allKeys.get(keyID);
+    if (key == null) {
+      LOG.warn("Access key for keyID=" + keyID + " doesn't exist.");
+      return false;
+    }
+    if (key.getMac() == null) {
+      initMac(key);
+    }
+    Text tokenID = token.getTokenID();
+    Text authenticator = new Text(key.getMac().doFinal(tokenID.getBytes()));
+    return authenticator.equals(token.getTokenAuthenticator());
+  }
+
+  /** Generate an access token for current user */
+  public BlockAccessToken generateToken(long blockID, EnumSet<AccessMode> modes)
+      throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    String userID = (ugi == null ? null : ugi.getUserName());
+    return generateToken(userID, blockID, modes);
+  }
+
+  /** Generate an access token for a specified user */
+  public synchronized BlockAccessToken generateToken(String userID, long blockID,
+      EnumSet<AccessMode> modes) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generating access token for user=" + userID + ", blockID="
+          + blockID + ", access modes=" + modes + ", keyID="
+          + currentKey.getKeyID());
+    }
+    if (modes == null || modes.isEmpty())
+      throw new IOException("access modes can't be null or empty");
+    ByteArrayOutputStream buf = new ByteArrayOutputStream(4096);
+    DataOutputStream out = new DataOutputStream(buf);
+    WritableUtils.writeVLong(out, System.currentTimeMillis() + tokenLifetime);
+    WritableUtils.writeVLong(out, currentKey.getKeyID());
+    WritableUtils.writeString(out, userID);
+    WritableUtils.writeVLong(out, blockID);
+    WritableUtils.writeVInt(out, modes.size());
+    for (AccessMode aMode : modes) {
+      WritableUtils.writeEnum(out, aMode);
+    }
+    Text tokenID = new Text(buf.toByteArray());
+    return new BlockAccessToken(tokenID, new Text(currentKey.getMac().doFinal(
+        tokenID.getBytes())));
+  }
+
+  /** Check if access should be allowed. userID is not checked if null */
+  public boolean checkAccess(BlockAccessToken token, String userID, long blockID,
+      AccessMode mode) throws IOException {
+    long oExpiry = 0;
+    long oKeyID = 0;
+    String oUserID = null;
+    long oBlockID = 0;
+    EnumSet<AccessMode> oModes = EnumSet.noneOf(AccessMode.class);
+
+    try {
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getTokenID()
+          .getBytes());
+      DataInputStream in = new DataInputStream(buf);
+      oExpiry = WritableUtils.readVLong(in);
+      oKeyID = WritableUtils.readVLong(in);
+      oUserID = WritableUtils.readString(in);
+      oBlockID = WritableUtils.readVLong(in);
+      int length = WritableUtils.readVInt(in);
+      for (int i = 0; i < length; ++i) {
+        oModes.add(WritableUtils.readEnum(in, AccessMode.class));
+      }
+    } catch (IOException e) {
+      throw (IOException) new IOException(
+          "Unable to parse access token for user=" + userID + ", blockID="
+              + blockID + ", access mode=" + mode).initCause(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Verifying access token for user=" + userID + ", blockID="
+          + blockID + ", access mode=" + mode + ", keyID=" + oKeyID);
+    }
+    return (userID == null || userID.equals(oUserID)) && oBlockID == blockID
+        && !isExpired(oExpiry) && oModes.contains(mode)
+        && verifyToken(oKeyID, token);
+  }
+
+  private static boolean isExpired(long expiryDate) {
+    return System.currentTimeMillis() > expiryDate;
+  }
+
+  /** check if a token is expired. for unit test only.
+   *  return true when token is expired, false otherwise */
+  static boolean isTokenExpired(BlockAccessToken token) throws IOException {
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getTokenID()
+        .getBytes());
+    DataInputStream in = new DataInputStream(buf);
+    long expiryDate = WritableUtils.readVLong(in);
+    return isExpired(expiryDate);
+  }
+
+  /** set token lifetime. for unit test only */
+  synchronized void setTokenLifetime(long tokenLifetime) {
+    this.tokenLifetime = tokenLifetime;
+  }
+}

+ 110 - 0
src/java/org/apache/hadoop/hdfs/security/BlockAccessKey.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.crypto.Mac;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Key used for generating and verifying access tokens
+ */
+public class BlockAccessKey implements Writable {
+  private long keyID;
+  private Text key;
+  private long expiryDate;
+  private Mac mac;
+
+  public BlockAccessKey() {
+    this(0L, new Text(), 0L);
+  }
+
+  public BlockAccessKey(long keyID, Text key, long expiryDate) {
+    this.keyID = keyID;
+    this.key = key;
+    this.expiryDate = expiryDate;
+  }
+
+  public long getKeyID() {
+    return keyID;
+  }
+
+  public Text getKey() {
+    return key;
+  }
+
+  public long getExpiryDate() {
+    return expiryDate;
+  }
+
+  public Mac getMac() {
+    return mac;
+  }
+
+  public void setMac(Mac mac) {
+    this.mac = mac;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof BlockAccessKey) {
+      BlockAccessKey that = (BlockAccessKey) obj;
+      return this.keyID == that.keyID && isEqual(this.key, that.key)
+          && this.expiryDate == that.expiryDate;
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return key == null ? 0 : key.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVLong(out, keyID);
+    key.write(out);
+    WritableUtils.writeVLong(out, expiryDate);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    keyID = WritableUtils.readVLong(in);
+    key.readFields(in);
+    expiryDate = WritableUtils.readVLong(in);
+  }
+}

+ 89 - 0
src/java/org/apache/hadoop/hdfs/security/BlockAccessToken.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class BlockAccessToken implements Writable {
+  public static final BlockAccessToken DUMMY_TOKEN = new BlockAccessToken();
+  private Text tokenID;
+  private Text tokenAuthenticator;
+
+  public BlockAccessToken() {
+    this(new Text(), new Text());
+  }
+
+  public BlockAccessToken(Text tokenID, Text tokenAuthenticator) {
+    this.tokenID = tokenID;
+    this.tokenAuthenticator = tokenAuthenticator;
+  }
+
+  public Text getTokenID() {
+    return tokenID;
+  }
+
+  public Text getTokenAuthenticator() {
+    return tokenAuthenticator;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof BlockAccessToken) {
+      BlockAccessToken that = (BlockAccessToken) obj;
+      return isEqual(this.tokenID, that.tokenID)
+          && isEqual(this.tokenAuthenticator, that.tokenAuthenticator);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return tokenAuthenticator == null ? 0 : tokenAuthenticator.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    tokenID.write(out);
+    tokenAuthenticator.write(out);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    tokenID.readFields(in);
+    tokenAuthenticator.readFields(in);
+  }
+
+}

+ 138 - 0
src/java/org/apache/hadoop/hdfs/security/ExportedAccessKeys.java

@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Object for passing access keys
+ */
+public class ExportedAccessKeys implements Writable {
+  public static final ExportedAccessKeys DUMMY_KEYS = new ExportedAccessKeys();
+  private boolean isAccessTokenEnabled;
+  private long keyUpdateInterval;
+  private long tokenLifetime;
+  private BlockAccessKey currentKey;
+  private BlockAccessKey[] allKeys;
+
+  public ExportedAccessKeys() {
+    this(false, 0, 0, new BlockAccessKey(), new BlockAccessKey[0]);
+  }
+
+  ExportedAccessKeys(boolean isAccessTokenEnabled, long keyUpdateInterval,
+      long tokenLifetime, BlockAccessKey currentKey, BlockAccessKey[] allKeys) {
+    this.isAccessTokenEnabled = isAccessTokenEnabled;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.currentKey = currentKey;
+    this.allKeys = allKeys;
+  }
+
+  public boolean isAccessTokenEnabled() {
+    return isAccessTokenEnabled;
+  }
+
+  public long getKeyUpdateInterval() {
+    return keyUpdateInterval;
+  }
+
+  public long getTokenLifetime() {
+    return tokenLifetime;
+  }
+
+  public BlockAccessKey getCurrentKey() {
+    return currentKey;
+  }
+
+  public BlockAccessKey[] getAllKeys() {
+    return allKeys;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof ExportedAccessKeys) {
+      ExportedAccessKeys that = (ExportedAccessKeys) obj;
+      return this.isAccessTokenEnabled == that.isAccessTokenEnabled
+          && this.keyUpdateInterval == that.keyUpdateInterval
+          && this.tokenLifetime == that.tokenLifetime
+          && isEqual(this.currentKey, that.currentKey)
+          && Arrays.equals(this.allKeys, that.allKeys);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return currentKey == null ? 0 : currentKey.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  static { // register a ctor
+    WritableFactories.setFactory(ExportedAccessKeys.class,
+        new WritableFactory() {
+          public Writable newInstance() {
+            return new ExportedAccessKeys();
+          }
+        });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isAccessTokenEnabled);
+    out.writeLong(keyUpdateInterval);
+    out.writeLong(tokenLifetime);
+    currentKey.write(out);
+    out.writeInt(allKeys.length);
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i].write(out);
+    }
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    isAccessTokenEnabled = in.readBoolean();
+    keyUpdateInterval = in.readLong();
+    tokenLifetime = in.readLong();
+    currentKey.readFields(in);
+    this.allKeys = new BlockAccessKey[in.readInt()];
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i] = new BlockAccessKey();
+      allKeys[i].readFields(in);
+    }
+  }
+
+}

+ 36 - 0
src/java/org/apache/hadoop/hdfs/security/InvalidAccessTokenException.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import java.io.IOException;
+
+/**
+ * Access token verification failed.
+ */
+public class InvalidAccessTokenException extends IOException {
+  private static final long serialVersionUID = 168L;
+
+  public InvalidAccessTokenException() {
+    super();
+  }
+
+  public InvalidAccessTokenException(String msg) {
+    super(msg);
+  }
+}

+ 4 - 4
src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -62,6 +62,9 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -78,9 +81,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
@@ -369,7 +369,7 @@ public class Balancer implements Tool {
     
     
     /* Send a block replace request to the output stream*/
     /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
     private void sendRequest(DataOutputStream out) throws IOException {
-      AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+      BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
       if (isAccessTokenEnabled) {
       if (isAccessTokenEnabled) {
         accessToken = accessTokenHandler.generateToken(null, block.getBlock()
         accessToken = accessTokenHandler.generateToken(null, block.getBlock()
             .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
             .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,

+ 2 - 2
src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -39,9 +39,9 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionInfo;
 
 
@@ -106,7 +106,7 @@ public class JspHelper {
   }
   }
 
 
   public static void streamBlockInAscii(InetSocketAddress addr, long blockId, 
   public static void streamBlockInAscii(InetSocketAddress addr, long blockId, 
-                                 AccessToken accessToken, long genStamp, long blockSize, 
+                                 BlockAccessToken accessToken, long genStamp, long blockSize, 
                                  long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
                                  long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
     throws IOException {
     throws IOException {
     if (chunkSizeToView == 0) return;
     if (chunkSizeToView == 0) return;

+ 4 - 4
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -57,6 +57,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -88,9 +91,6 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -1248,7 +1248,7 @@ public class DataNode extends Configured
         //
         //
         // Header info
         // Header info
         //
         //
-        AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+        BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
         if (isAccessTokenEnabled) {
         if (isAccessTokenEnabled) {
           accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
           accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
               EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
               EnumSet.of(AccessTokenHandler.AccessMode.WRITE));

+ 7 - 7
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -39,6 +39,8 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -47,8 +49,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
@@ -128,7 +128,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
   @Override
   @Override
   protected void opReadBlock(DataInputStream in,
   protected void opReadBlock(DataInputStream in,
       long blockId, long blockGs, long startOffset, long length,
       long blockId, long blockGs, long startOffset, long length,
-      String clientName, AccessToken accessToken) throws IOException {
+      String clientName, BlockAccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     final Block block = new Block(blockId, 0 , blockGs);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
         datanode.socketWriteTimeout);
@@ -212,7 +212,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
       int pipelineSize, BlockConstructionStage stage,
       int pipelineSize, BlockConstructionStage stage,
       long newGs, long minBytesRcvd, long maxBytesRcvd,
       long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
-      AccessToken accessToken) throws IOException {
+      BlockAccessToken accessToken) throws IOException {
 
 
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
@@ -396,7 +396,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    */
    */
   @Override
   @Override
   protected void opBlockChecksum(DataInputStream in,
   protected void opBlockChecksum(DataInputStream in,
-      long blockId, long blockGs, AccessToken accessToken) throws IOException {
+      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
         datanode.socketWriteTimeout));
@@ -455,7 +455,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    */
    */
   @Override
   @Override
   protected void opCopyBlock(DataInputStream in,
   protected void opCopyBlock(DataInputStream in,
-      long blockId, long blockGs, AccessToken accessToken) throws IOException {
+      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
     // Read in the header
     // Read in the header
     Block block = new Block(blockId, 0, blockGs);
     Block block = new Block(blockId, 0, blockGs);
     if (datanode.isAccessTokenEnabled
     if (datanode.isAccessTokenEnabled
@@ -526,7 +526,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
   @Override
   @Override
   protected void opReplaceBlock(DataInputStream in,
   protected void opReplaceBlock(DataInputStream in,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
-      AccessToken accessToken) throws IOException {
+      BlockAccessToken accessToken) throws IOException {
     /* read header */
     /* read header */
     final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
     final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
         blockGs);
         blockGs);

+ 6 - 5
src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java

@@ -32,12 +32,12 @@ import javax.servlet.jsp.JspWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
 class DatanodeJspHelper {
 class DatanodeJspHelper {
@@ -325,9 +325,10 @@ class DatanodeJspHelper {
     final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
     final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
         JspHelper.conf);
         JspHelper.conf);
 
 
-    AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+    BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
     if (JspHelper.conf.getBoolean(
     if (JspHelper.conf.getBoolean(
-        AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false)) {
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT)) {
       List<LocatedBlock> blks = dfs.getNamenode().getBlockLocations(filename, 0,
       List<LocatedBlock> blks = dfs.getNamenode().getBlockLocations(filename, 0,
           Long.MAX_VALUE).getLocatedBlocks();
           Long.MAX_VALUE).getLocatedBlocks();
       if (blks == null || blks.size() == 0) {
       if (blks == null || blks.size() == 0) {
@@ -564,7 +565,7 @@ class DatanodeJspHelper {
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
     long blockId = lastBlk.getBlock().getBlockId();
-    AccessToken accessToken = lastBlk.getAccessToken();
+    BlockAccessToken accessToken = lastBlk.getAccessToken();
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     DatanodeInfo chosenNode;
     try {
     try {

+ 8 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -22,6 +22,8 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -30,8 +32,6 @@ import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.PermissionChecker;
 import org.apache.hadoop.security.PermissionChecker;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -438,12 +438,15 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
     this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
     this.supportAppends = conf.getBoolean("dfs.support.append", false);
     this.supportAppends = conf.getBoolean("dfs.support.append", false);
     this.isAccessTokenEnabled = conf.getBoolean(
     this.isAccessTokenEnabled = conf.getBoolean(
-        AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
     if (isAccessTokenEnabled) {
     if (isAccessTokenEnabled) {
       this.accessKeyUpdateInterval = conf.getLong(
       this.accessKeyUpdateInterval = conf.getLong(
-          AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+          DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, 
+          DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
       this.accessTokenLifetime = conf.getLong(
       this.accessTokenLifetime = conf.getLong(
-          AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
     }
     }
     LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
     LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
         + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
         + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -73,7 +74,6 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.AuthorizationException;

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java

@@ -23,13 +23,13 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.ExportedAccessKeys;
 
 
 /** 
 /** 
  * DatanodeRegistration class contains all information the name-node needs
  * DatanodeRegistration class contains all information the name-node needs

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java

@@ -21,10 +21,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.IOException;
 
 
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.ExportedAccessKeys;
 
 
 public class KeyUpdateCommand extends DatanodeCommand {
 public class KeyUpdateCommand extends DatanodeCommand {
   private ExportedAccessKeys keys;
   private ExportedAccessKeys keys;

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java

@@ -21,9 +21,9 @@ package org.apache.hadoop.hdfs.server.protocol;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.security.ExportedAccessKeys;
 
 
 /*****************************************************************************
 /*****************************************************************************
  * Protocol that a secondary NameNode uses to communicate with the NameNode.
  * Protocol that a secondary NameNode uses to communicate with the NameNode.

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -37,9 +37,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 
@@ -258,7 +258,7 @@ public class DFSTestUtil {
     return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
     return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
   }
   }
 
 
-  public static AccessToken getAccessToken(FSDataOutputStream out) {
+  public static BlockAccessToken getAccessToken(FSDataOutputStream out) {
     return ((DFSClient.DFSOutputStream) out.getWrappedStream()).getAccessToken();
     return ((DFSClient.DFSOutputStream) out.getWrappedStream()).getAccessToken();
   }
   }
 
 

+ 11 - 11
src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -48,12 +48,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -170,7 +170,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         block.getBlockId(), block.getGenerationStamp(), 0,
         block.getBlockId(), block.getGenerationStamp(), 0,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
-        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
     if (eofExcepted) {
     if (eofExcepted) {
       ERROR.write(recvOut);
       ERROR.write(recvOut);
       sendRecvData(description, true);
       sendRecvData(description, true);
@@ -356,7 +356,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         newBlockId, 0L, 0,
         newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     
     // bad bytes per checksum
     // bad bytes per checksum
@@ -370,7 +370,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
     DataTransferProtocol.Sender.opWriteBlock(sendOut,
         ++newBlockId, 0L, 0,
         ++newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);
     sendOut.writeInt(512);
     sendOut.writeInt(4);           // size of packet
     sendOut.writeInt(4);           // size of packet
@@ -393,7 +393,7 @@ public class TestDataTransferProtocol extends TestCase {
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
     DataTransferProtocol.Sender.opWriteBlock(sendOut, 
         ++newBlockId, 0L, 0,
         ++newBlockId, 0L, 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
     sendOut.writeInt(512);         // checksum size
     sendOut.writeInt(8);           // size of packet
     sendOut.writeInt(8);           // size of packet
@@ -425,7 +425,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     ERROR.write(recvOut);
     ERROR.write(recvOut);
     Text.writeString(sendOut, "cl");
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
 
     // negative block start offset
     // negative block start offset
@@ -437,7 +437,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(-1L);
     sendOut.writeLong(-1L);
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative start-offset for read for block " + 
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
 
 
@@ -450,7 +450,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong start-offset for reading block " +
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
     
     
@@ -465,7 +465,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(0);
     sendOut.writeLong(0);
     sendOut.writeLong(-1-random.nextInt(oneMil));
     sendOut.writeLong(-1-random.nextInt(oneMil));
     Text.writeString(sendOut, "cl");
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative length for reading block " +
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
     
     
@@ -480,7 +480,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(0);
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen + 1);
     sendOut.writeLong(fileLen + 1);
     Text.writeString(sendOut, "cl");
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong length for reading block " +
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
     
     
@@ -493,7 +493,7 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeLong(0);
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     readFile(fileSys, file, fileLen);
     readFile(fileSys, file, fileLen);
     } finally {
     } finally {
       cluster.shutdown();
       cluster.shutdown();

+ 43 - 0
src/test/hdfs/org/apache/hadoop/hdfs/security/SecurityTestUtil.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import java.io.IOException;
+
+/** Utilities for security tests */
+public class SecurityTestUtil {
+
+  /**
+   * check if an access token is expired. return true when token is expired,
+   * false otherwise
+   */
+  public static boolean isAccessTokenExpired(BlockAccessToken token)
+      throws IOException {
+    return AccessTokenHandler.isTokenExpired(token);
+  }
+
+  /**
+   * set access token lifetime.
+   */
+  public static void setAccessTokenLifetime(AccessTokenHandler handler,
+      long tokenLifetime) {
+    handler.setTokenLifetime(tokenLifetime);
+  }
+
+}

+ 89 - 0
src/test/hdfs/org/apache/hadoop/hdfs/security/TestAccessToken.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import java.util.EnumSet;
+
+import org.apache.hadoop.io.TestWritable;
+
+import junit.framework.TestCase;
+
+/** Unit tests for access tokens */
+public class TestAccessToken extends TestCase {
+  long accessKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
+  long accessTokenLifetime = 2 * 60 * 1000; // 2 mins
+  long blockID1 = 0L;
+  long blockID2 = 10L;
+  long blockID3 = -108L;
+
+  /** test Writable */
+  public void testWritable() throws Exception {
+    TestWritable.testWritable(ExportedAccessKeys.DUMMY_KEYS);
+    AccessTokenHandler handler = new AccessTokenHandler(true,
+        accessKeyUpdateInterval, accessTokenLifetime);
+    ExportedAccessKeys keys = handler.exportKeys();
+    TestWritable.testWritable(keys);
+    TestWritable.testWritable(BlockAccessToken.DUMMY_TOKEN);
+    BlockAccessToken token = handler.generateToken(blockID3, EnumSet
+        .allOf(AccessTokenHandler.AccessMode.class));
+    TestWritable.testWritable(token);
+  }
+
+  private void tokenGenerationAndVerification(AccessTokenHandler master,
+      AccessTokenHandler slave) throws Exception {
+    // single-mode tokens
+    for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
+        .values()) {
+      // generated by master
+      BlockAccessToken token1 = master.generateToken(blockID1, EnumSet.of(mode));
+      assertTrue(master.checkAccess(token1, null, blockID1, mode));
+      assertTrue(slave.checkAccess(token1, null, blockID1, mode));
+      // generated by slave
+      BlockAccessToken token2 = slave.generateToken(blockID2, EnumSet.of(mode));
+      assertTrue(master.checkAccess(token2, null, blockID2, mode));
+      assertTrue(slave.checkAccess(token2, null, blockID2, mode));
+    }
+    // multi-mode tokens
+    BlockAccessToken mtoken = master.generateToken(blockID3, EnumSet
+        .allOf(AccessTokenHandler.AccessMode.class));
+    for (AccessTokenHandler.AccessMode mode : AccessTokenHandler.AccessMode
+        .values()) {
+      assertTrue(master.checkAccess(mtoken, null, blockID3, mode));
+      assertTrue(slave.checkAccess(mtoken, null, blockID3, mode));
+    }
+  }
+
+  /** test access key and token handling */
+  public void testAccessTokenHandler() throws Exception {
+    AccessTokenHandler masterHandler = new AccessTokenHandler(true,
+        accessKeyUpdateInterval, accessTokenLifetime);
+    AccessTokenHandler slaveHandler = new AccessTokenHandler(false,
+        accessKeyUpdateInterval, accessTokenLifetime);
+    ExportedAccessKeys keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    // key updating
+    masterHandler.updateKeys();
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+    keys = masterHandler.exportKeys();
+    slaveHandler.setKeys(keys);
+    tokenGenerationAndVerification(masterHandler, slaveHandler);
+  }
+
+}

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

@@ -46,12 +46,12 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
 /**
 /**
  * This class tests if block replacement request to data nodes work correctly.
  * This class tests if block replacement request to data nodes work correctly.
  */
  */
@@ -236,7 +236,7 @@ public class TestBlockReplacement extends TestCase {
     out.writeLong(block.getGenerationStamp());
     out.writeLong(block.getGenerationStamp());
     Text.writeString(out, source.getStorageID());
     Text.writeString(out, source.getStorageID());
     sourceProxy.write(out);
     sourceProxy.write(out);
-    AccessToken.DUMMY_TOKEN.write(out);
+    BlockAccessToken.DUMMY_TOKEN.write(out);
     out.flush();
     out.flush();
     // receiveResponse
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());
     DataInputStream reply = new DataInputStream(sock.getInputStream());

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -37,11 +37,11 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.AccessToken;
 
 
 /** Test if a datanode can correctly handle errors during block read/write*/
 /** Test if a datanode can correctly handle errors during block read/write*/
 public class TestDiskError extends TestCase {
 public class TestDiskError extends TestCase {
@@ -123,7 +123,7 @@ public class TestDiskError extends TestCase {
           block.getBlock().getGenerationStamp(), 1, 
           block.getBlock().getGenerationStamp(), 1, 
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 
           BlockConstructionStage.PIPELINE_SETUP_CREATE, 
           0L, 0L, 0L, "", null, new DatanodeInfo[0], 
           0L, 0L, 0L, "", null, new DatanodeInfo[0], 
-          AccessToken.DUMMY_TOKEN);
+          BlockAccessToken.DUMMY_TOKEN);
 
 
       // write check header
       // write check header
       out.writeByte( 1 );
       out.writeByte( 1 );

+ 9 - 9
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java

@@ -33,6 +33,10 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
+import org.apache.hadoop.hdfs.security.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -40,10 +44,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.InvalidAccessTokenException;
-import org.apache.hadoop.security.SecurityTestUtil;
 import org.apache.log4j.Level;
 import org.apache.log4j.Level;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 
@@ -165,7 +165,7 @@ public class TestAccessTokenWithDFS extends TestCase {
   // get a conf for testing
   // get a conf for testing
   private static Configuration getConf(int numDataNodes) throws IOException {
   private static Configuration getConf(int numDataNodes) throws IOException {
     Configuration conf = new HdfsConfiguration();
     Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
     conf.setInt("dfs.heartbeat.interval", 1);
     conf.setInt("dfs.heartbeat.interval", 1);
@@ -208,7 +208,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       /*
       /*
        * wait till token used in stm expires
        * wait till token used in stm expires
        */
        */
-      AccessToken token = DFSTestUtil.getAccessToken(stm);
+      BlockAccessToken token = DFSTestUtil.getAccessToken(stm);
       while (!SecurityTestUtil.isAccessTokenExpired(token)) {
       while (!SecurityTestUtil.isAccessTokenExpired(token)) {
         try {
         try {
           Thread.sleep(10);
           Thread.sleep(10);
@@ -260,7 +260,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       /*
       /*
        * wait till token used in stm expires
        * wait till token used in stm expires
        */
        */
-      AccessToken token = DFSTestUtil.getAccessToken(stm);
+      BlockAccessToken token = DFSTestUtil.getAccessToken(stm);
       while (!SecurityTestUtil.isAccessTokenExpired(token)) {
       while (!SecurityTestUtil.isAccessTokenExpired(token)) {
         try {
         try {
           Thread.sleep(10);
           Thread.sleep(10);
@@ -322,7 +322,7 @@ public class TestAccessTokenWithDFS extends TestCase {
       List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
       List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
           FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
           FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
       LocatedBlock lblock = locatedBlocks.get(0); // first block
       LocatedBlock lblock = locatedBlocks.get(0); // first block
-      AccessToken myToken = lblock.getAccessToken();
+      BlockAccessToken myToken = lblock.getAccessToken();
       // verify token is not expired
       // verify token is not expired
       assertFalse(SecurityTestUtil.isAccessTokenExpired(myToken));
       assertFalse(SecurityTestUtil.isAccessTokenExpired(myToken));
       // read with valid token, should succeed
       // read with valid token, should succeed
@@ -527,7 +527,7 @@ public class TestAccessTokenWithDFS extends TestCase {
    */
    */
   public void testEnd2End() throws Exception {
   public void testEnd2End() throws Exception {
     Configuration conf = new HdfsConfiguration();
     Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     new TestBalancer().integrationTest(conf);
     new TestBalancer().integrationTest(conf);
   }
   }
 }
 }