|
@@ -153,24 +153,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
datanode.socketWriteTimeout);
|
|
|
DataOutputStream out = new DataOutputStream(
|
|
|
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
|
|
|
-
|
|
|
- if (datanode.isBlockTokenEnabled) {
|
|
|
- try {
|
|
|
- datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
|
|
- BlockTokenSecretManager.AccessMode.READ);
|
|
|
- } catch (InvalidToken e) {
|
|
|
- try {
|
|
|
- ERROR_ACCESS_TOKEN.write(out);
|
|
|
- out.flush();
|
|
|
- LOG.warn("Block token verification failed, for client "
|
|
|
- + remoteAddress + " for OP_READ_BLOCK for block " + block + " : "
|
|
|
- + e.getLocalizedMessage());
|
|
|
- throw e;
|
|
|
- } finally {
|
|
|
- IOUtils.closeStream(out);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ checkAccess(out, block, blockToken,
|
|
|
+ DataTransferProtocol.Op.READ_BLOCK,
|
|
|
+ BlockTokenSecretManager.AccessMode.READ);
|
|
|
|
|
|
// send the block
|
|
|
BlockSender blockSender = null;
|
|
@@ -250,32 +235,14 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
" src: " + remoteAddress +
|
|
|
" dest: " + localAddress);
|
|
|
|
|
|
- DataOutputStream replyOut = null; // stream to prev target
|
|
|
- replyOut = new DataOutputStream(new BufferedOutputStream(
|
|
|
- NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
|
|
|
- SMALL_BUFFER_SIZE));
|
|
|
- DatanodeRegistration dnR =
|
|
|
- datanode.getDNRegistrationForBP(block.getBlockPoolId());
|
|
|
- if (datanode.isBlockTokenEnabled) {
|
|
|
- try {
|
|
|
- datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
|
|
- BlockTokenSecretManager.AccessMode.WRITE);
|
|
|
- } catch (InvalidToken e) {
|
|
|
- try {
|
|
|
- if (isClient) {
|
|
|
- ERROR_ACCESS_TOKEN.write(replyOut);
|
|
|
- Text.writeString(replyOut, dnR.getName());
|
|
|
- replyOut.flush();
|
|
|
- }
|
|
|
- LOG.warn("Block token verification failed, for client "
|
|
|
- + remoteAddress + " for OP_WRITE_BLOCK for block " + block
|
|
|
- + " : " + e.getLocalizedMessage());
|
|
|
- throw e;
|
|
|
- } finally {
|
|
|
- IOUtils.closeStream(replyOut);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ // reply to upstream datanode or client
|
|
|
+ final DataOutputStream replyOut = new DataOutputStream(
|
|
|
+ new BufferedOutputStream(
|
|
|
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
|
|
|
+ SMALL_BUFFER_SIZE));
|
|
|
+ checkAccess(isClient? replyOut: null, block, blockToken,
|
|
|
+ DataTransferProtocol.Op.WRITE_BLOCK,
|
|
|
+ BlockTokenSecretManager.AccessMode.WRITE);
|
|
|
|
|
|
DataOutputStream mirrorOut = null; // stream to next target
|
|
|
DataInputStream mirrorIn = null; // reply from next target
|
|
@@ -298,8 +265,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
}
|
|
|
|
|
|
//
|
|
|
- // Open network conn to backup machine, if
|
|
|
- // appropriate
|
|
|
+ // Connect to downstream machine, if appropriate
|
|
|
//
|
|
|
if (targets.length > 0) {
|
|
|
InetSocketAddress mirrorTarget = null;
|
|
@@ -321,7 +287,6 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
SMALL_BUFFER_SIZE));
|
|
|
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
|
|
|
|
|
|
- // Write header: Copied from DFSClient.java!
|
|
|
DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
|
|
|
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
|
|
|
srcDataNode, targets, blockToken);
|
|
@@ -358,7 +323,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
if (isClient) {
|
|
|
throw e;
|
|
|
} else {
|
|
|
- LOG.info(dnR + ":Exception transfering block " +
|
|
|
+ LOG.info(datanode + ":Exception transfering block " +
|
|
|
block + " to mirror " + mirrorNode +
|
|
|
". continuing without the mirror.\n" +
|
|
|
StringUtils.stringifyException(e));
|
|
@@ -430,26 +395,11 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
@Override
|
|
|
protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
|
|
|
Token<BlockTokenIdentifier> blockToken) throws IOException {
|
|
|
- DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
|
|
|
- datanode.socketWriteTimeout));
|
|
|
- if (datanode.isBlockTokenEnabled) {
|
|
|
- try {
|
|
|
- datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null,
|
|
|
- block, BlockTokenSecretManager.AccessMode.READ);
|
|
|
- } catch (InvalidToken e) {
|
|
|
- try {
|
|
|
- ERROR_ACCESS_TOKEN.write(out);
|
|
|
- out.flush();
|
|
|
- LOG.warn("Block token verification failed, for client "
|
|
|
- + remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block
|
|
|
- + " : " + e.getLocalizedMessage());
|
|
|
- throw e;
|
|
|
- } finally {
|
|
|
- IOUtils.closeStream(out);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ final DataOutputStream out = new DataOutputStream(
|
|
|
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
|
|
+ checkAccess(out, block, blockToken,
|
|
|
+ DataTransferProtocol.Op.BLOCK_CHECKSUM,
|
|
|
+ BlockTokenSecretManager.AccessMode.READ);
|
|
|
updateCurrentThreadName("Reading metadata for block " + block);
|
|
|
final MetaDataInputStream metadataIn =
|
|
|
datanode.data.getMetaDataInputStream(block);
|
|
@@ -703,4 +653,34 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
IOUtils.closeStream(reply);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void checkAccess(final DataOutputStream out,
|
|
|
+ final ExtendedBlock blk,
|
|
|
+ final Token<BlockTokenIdentifier> t,
|
|
|
+ final DataTransferProtocol.Op op,
|
|
|
+ final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
|
|
+ if (datanode.isBlockTokenEnabled) {
|
|
|
+ try {
|
|
|
+ datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
|
|
|
+ } catch(InvalidToken e) {
|
|
|
+ try {
|
|
|
+ if (out != null) {
|
|
|
+ ERROR_ACCESS_TOKEN.write(out);
|
|
|
+ if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
|
|
|
+ DatanodeRegistration dnR =
|
|
|
+ datanode.getDNRegistrationForBP(blk.getBlockPoolId());
|
|
|
+ Text.writeString(out, dnR.getName());
|
|
|
+ }
|
|
|
+ out.flush();
|
|
|
+ }
|
|
|
+ LOG.warn("Block token verification failed: op=" + op
|
|
|
+ + ", remoteAddress=" + remoteAddress
|
|
|
+ + ", message=" + e.getLocalizedMessage());
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(out);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|