|
@@ -27,15 +27,13 @@ import java.util.Random;
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
|
|
|
-import org.apache.hadoop.hdfs.security.AccessTokenHandler;
|
|
|
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
|
|
|
-import org.apache.hadoop.hdfs.security.SecurityTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.security.token.block.*;
|
|
|
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
@@ -43,11 +41,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.security.token.*;
|
|
|
import org.apache.log4j.Level;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
-public class TestAccessTokenWithDFS extends TestCase {
|
|
|
+public class TestBlockTokenWithDFS extends TestCase {
|
|
|
|
|
|
private static final int BLOCK_SIZE = 1024;
|
|
|
private static final int FILE_SIZE = 2 * BLOCK_SIZE;
|
|
@@ -133,11 +132,11 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
blockReader = DFSClient.BlockReader.newBlockReader(s, targetAddr
|
|
|
.toString()
|
|
|
+ ":" + block.getBlockId(), block.getBlockId(), lblock
|
|
|
- .getAccessToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
|
|
|
+ .getBlockToken(), block.getGenerationStamp(), 0, -1, conf.getInt(
|
|
|
"io.file.buffer.size", 4096));
|
|
|
|
|
|
} catch (IOException ex) {
|
|
|
- if (ex instanceof InvalidAccessTokenException) {
|
|
|
+ if (ex instanceof InvalidBlockTokenException) {
|
|
|
assertFalse("OP_READ_BLOCK: access token is invalid, "
|
|
|
+ "when it is expected to be valid", shouldSucceed);
|
|
|
return;
|
|
@@ -163,7 +162,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
// get a conf for testing
|
|
|
private static Configuration getConf(int numDataNodes) throws IOException {
|
|
|
Configuration conf = new Configuration();
|
|
|
- conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
|
|
conf.setLong("dfs.block.size", BLOCK_SIZE);
|
|
|
conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
|
|
|
conf.setInt("dfs.heartbeat.interval", 1);
|
|
@@ -187,7 +186,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
cluster.waitActive();
|
|
|
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
|
|
// set a short token lifetime (1 second)
|
|
|
- SecurityTestUtil.setAccessTokenLifetime(
|
|
|
+ SecurityTestUtil.setBlockTokenLifetime(
|
|
|
cluster.getNameNode().getNamesystem().accessTokenHandler, 1000L);
|
|
|
Path fileToAppend = new Path(FILE_TO_APPEND);
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
@@ -206,8 +205,8 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
/*
|
|
|
* wait till token used in stm expires
|
|
|
*/
|
|
|
- BlockAccessToken token = DFSTestUtil.getAccessToken(stm);
|
|
|
- while (!SecurityTestUtil.isAccessTokenExpired(token)) {
|
|
|
+ Token<BlockTokenIdentifier> token = DFSTestUtil.getAccessToken(stm);
|
|
|
+ while (!SecurityTestUtil.isBlockTokenExpired(token)) {
|
|
|
try {
|
|
|
Thread.sleep(10);
|
|
|
} catch (InterruptedException ignored) {
|
|
@@ -243,7 +242,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
cluster.waitActive();
|
|
|
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
|
|
// set a short token lifetime (1 second)
|
|
|
- SecurityTestUtil.setAccessTokenLifetime(
|
|
|
+ SecurityTestUtil.setBlockTokenLifetime(
|
|
|
cluster.getNameNode().getNamesystem().accessTokenHandler, 1000L);
|
|
|
Path fileToWrite = new Path(FILE_TO_WRITE);
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
@@ -258,8 +257,8 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
/*
|
|
|
* wait till token used in stm expires
|
|
|
*/
|
|
|
- BlockAccessToken token = DFSTestUtil.getAccessToken(stm);
|
|
|
- while (!SecurityTestUtil.isAccessTokenExpired(token)) {
|
|
|
+ Token<BlockTokenIdentifier> token = DFSTestUtil.getAccessToken(stm);
|
|
|
+ while (!SecurityTestUtil.isBlockTokenExpired(token)) {
|
|
|
try {
|
|
|
Thread.sleep(10);
|
|
|
} catch (InterruptedException ignored) {
|
|
@@ -291,7 +290,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
cluster.waitActive();
|
|
|
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
|
|
// set a short token lifetime (1 second) initially
|
|
|
- SecurityTestUtil.setAccessTokenLifetime(
|
|
|
+ SecurityTestUtil.setBlockTokenLifetime(
|
|
|
cluster.getNameNode().getNamesystem().accessTokenHandler, 1000L);
|
|
|
Path fileToRead = new Path(FILE_TO_READ);
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
@@ -320,9 +319,9 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
List<LocatedBlock> locatedBlocks = cluster.getNameNode().getBlockLocations(
|
|
|
FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
|
|
|
LocatedBlock lblock = locatedBlocks.get(0); // first block
|
|
|
- BlockAccessToken myToken = lblock.getAccessToken();
|
|
|
+ Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
|
|
|
// verify token is not expired
|
|
|
- assertFalse(SecurityTestUtil.isAccessTokenExpired(myToken));
|
|
|
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
|
|
|
// read with valid token, should succeed
|
|
|
tryRead(conf, lblock, true);
|
|
|
|
|
@@ -330,7 +329,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
* wait till myToken and all cached tokens in in1, in2 and in3 expire
|
|
|
*/
|
|
|
|
|
|
- while (!SecurityTestUtil.isAccessTokenExpired(myToken)) {
|
|
|
+ while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
|
|
|
try {
|
|
|
Thread.sleep(10);
|
|
|
} catch (InterruptedException ignored) {
|
|
@@ -342,32 +341,33 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
*/
|
|
|
|
|
|
// verify token is expired
|
|
|
- assertTrue(SecurityTestUtil.isAccessTokenExpired(myToken));
|
|
|
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken));
|
|
|
// read should fail
|
|
|
tryRead(conf, lblock, false);
|
|
|
// use a valid new token
|
|
|
- lblock.setAccessToken(cluster.getNameNode().getNamesystem()
|
|
|
- .accessTokenHandler.generateToken(lblock.getBlock().getBlockId(),
|
|
|
- EnumSet.of(AccessTokenHandler.AccessMode.READ)));
|
|
|
+ lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
|
|
+ .accessTokenHandler.generateToken(lblock.getBlock(),
|
|
|
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
|
|
// read should succeed
|
|
|
tryRead(conf, lblock, true);
|
|
|
// use a token with wrong blockID
|
|
|
- lblock.setAccessToken(cluster.getNameNode().getNamesystem()
|
|
|
- .accessTokenHandler.generateToken(lblock.getBlock().getBlockId() + 1,
|
|
|
- EnumSet.of(AccessTokenHandler.AccessMode.READ)));
|
|
|
+ Block wrongBlock = new Block(lblock.getBlock().getBlockId() + 1);
|
|
|
+ lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
|
|
+ .accessTokenHandler.generateToken(wrongBlock,
|
|
|
+ EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
|
|
// read should fail
|
|
|
tryRead(conf, lblock, false);
|
|
|
// use a token with wrong access modes
|
|
|
- lblock.setAccessToken(cluster.getNameNode().getNamesystem()
|
|
|
- .accessTokenHandler.generateToken(lblock.getBlock().getBlockId(), EnumSet.of(
|
|
|
- AccessTokenHandler.AccessMode.WRITE,
|
|
|
- AccessTokenHandler.AccessMode.COPY,
|
|
|
- AccessTokenHandler.AccessMode.REPLACE)));
|
|
|
+ lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
|
|
+ .accessTokenHandler.generateToken(lblock.getBlock(), EnumSet.of(
|
|
|
+ BlockTokenSecretManager.AccessMode.WRITE,
|
|
|
+ BlockTokenSecretManager.AccessMode.COPY,
|
|
|
+ BlockTokenSecretManager.AccessMode.REPLACE)));
|
|
|
// read should fail
|
|
|
tryRead(conf, lblock, false);
|
|
|
|
|
|
// set a long token lifetime for future tokens
|
|
|
- SecurityTestUtil.setAccessTokenLifetime(
|
|
|
+ SecurityTestUtil.setBlockTokenLifetime(
|
|
|
cluster.getNameNode().getNamesystem().accessTokenHandler, 600 * 1000L);
|
|
|
|
|
|
/*
|
|
@@ -378,7 +378,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
// confirm all tokens cached in in1 are expired by now
|
|
|
List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
|
|
|
for (LocatedBlock blk : lblocks) {
|
|
|
- assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
|
|
|
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
|
|
|
}
|
|
|
// verify blockSeekTo() is able to re-fetch token transparently
|
|
|
in1.seek(0);
|
|
@@ -387,7 +387,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
// confirm all tokens cached in in2 are expired by now
|
|
|
List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
|
|
|
for (LocatedBlock blk : lblocks2) {
|
|
|
- assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
|
|
|
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
|
|
|
}
|
|
|
// verify blockSeekTo() is able to re-fetch token transparently (testing
|
|
|
// via another interface method)
|
|
@@ -397,7 +397,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
// confirm all tokens cached in in3 are expired by now
|
|
|
List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
|
|
|
for (LocatedBlock blk : lblocks3) {
|
|
|
- assertTrue(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
|
|
|
+ assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
|
|
|
}
|
|
|
// verify fetchBlockByteRange() is able to re-fetch token transparently
|
|
|
assertTrue(checkFile2(in3));
|
|
@@ -418,7 +418,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
// confirm tokens cached in in1 are still valid
|
|
|
lblocks = DFSTestUtil.getAllBlocks(in1);
|
|
|
for (LocatedBlock blk : lblocks) {
|
|
|
- assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
|
|
|
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
|
|
|
}
|
|
|
// verify blockSeekTo() still works (forced to use cached tokens)
|
|
|
in1.seek(0);
|
|
@@ -427,7 +427,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
// confirm tokens cached in in2 are still valid
|
|
|
lblocks2 = DFSTestUtil.getAllBlocks(in2);
|
|
|
for (LocatedBlock blk : lblocks2) {
|
|
|
- assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
|
|
|
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
|
|
|
}
|
|
|
// verify blockSeekTo() still works (forced to use cached tokens)
|
|
|
in2.seekToNewSource(0);
|
|
@@ -436,7 +436,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
// confirm tokens cached in in3 are still valid
|
|
|
lblocks3 = DFSTestUtil.getAllBlocks(in3);
|
|
|
for (LocatedBlock blk : lblocks3) {
|
|
|
- assertFalse(SecurityTestUtil.isAccessTokenExpired(blk.getAccessToken()));
|
|
|
+ assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
|
|
|
}
|
|
|
// verify fetchBlockByteRange() still works (forced to use cached tokens)
|
|
|
assertTrue(checkFile2(in3));
|
|
@@ -525,7 +525,7 @@ public class TestAccessTokenWithDFS extends TestCase {
|
|
|
*/
|
|
|
public void testEnd2End() throws Exception {
|
|
|
Configuration conf = new Configuration();
|
|
|
- conf.setBoolean(AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, true);
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
|
|
new TestBalancer().integrationTest(conf);
|
|
|
}
|
|
|
}
|