|
@@ -44,6 +44,7 @@ import java.net.SocketException;
|
|
import java.nio.channels.ClosedChannelException;
|
|
import java.nio.channels.ClosedChannelException;
|
|
import java.security.MessageDigest;
|
|
import java.security.MessageDigest;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
|
@@ -84,6 +85,7 @@ import org.apache.hadoop.net.unix.DomainSocket;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.protobuf.ByteString;
|
|
import com.google.protobuf.ByteString;
|
|
@@ -273,6 +275,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
|
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
|
throws IOException {
|
|
throws IOException {
|
|
updateCurrentThreadName("Passing file descriptors for block " + blk);
|
|
updateCurrentThreadName("Passing file descriptors for block " + blk);
|
|
|
|
+ DataOutputStream out = getBufferedOutputStream();
|
|
|
|
+ checkAccess(out, true, blk, token,
|
|
|
|
+ Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenSecretManager.AccessMode.READ);
|
|
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
|
|
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
|
|
FileInputStream fis[] = null;
|
|
FileInputStream fis[] = null;
|
|
SlotId registeredSlotId = null;
|
|
SlotId registeredSlotId = null;
|
|
@@ -301,9 +306,6 @@ class DataXceiver extends Receiver implements Runnable {
|
|
} catch (ShortCircuitFdsUnsupportedException e) {
|
|
} catch (ShortCircuitFdsUnsupportedException e) {
|
|
bld.setStatus(ERROR_UNSUPPORTED);
|
|
bld.setStatus(ERROR_UNSUPPORTED);
|
|
bld.setMessage(e.getMessage());
|
|
bld.setMessage(e.getMessage());
|
|
- } catch (InvalidToken e) {
|
|
|
|
- bld.setStatus(ERROR_ACCESS_TOKEN);
|
|
|
|
- bld.setMessage(e.getMessage());
|
|
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
bld.setStatus(ERROR);
|
|
bld.setStatus(ERROR);
|
|
bld.setMessage(e.getMessage());
|
|
bld.setMessage(e.getMessage());
|
|
@@ -489,10 +491,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|
final boolean sendChecksum,
|
|
final boolean sendChecksum,
|
|
final CachingStrategy cachingStrategy) throws IOException {
|
|
final CachingStrategy cachingStrategy) throws IOException {
|
|
previousOpClientName = clientName;
|
|
previousOpClientName = clientName;
|
|
-
|
|
|
|
|
|
+ updateCurrentThreadName("Sending block " + block);
|
|
OutputStream baseStream = getOutputStream();
|
|
OutputStream baseStream = getOutputStream();
|
|
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
|
|
|
- baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
|
|
|
|
|
+ DataOutputStream out = getBufferedOutputStream();
|
|
checkAccess(out, true, block, blockToken,
|
|
checkAccess(out, true, block, blockToken,
|
|
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
|
|
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
|
|
|
|
|
|
@@ -508,7 +509,6 @@ class DataXceiver extends Receiver implements Runnable {
|
|
: dnR + " Served block " + block + " to " +
|
|
: dnR + " Served block " + block + " to " +
|
|
remoteAddress;
|
|
remoteAddress;
|
|
|
|
|
|
- updateCurrentThreadName("Sending block " + block);
|
|
|
|
try {
|
|
try {
|
|
try {
|
|
try {
|
|
blockSender = new BlockSender(block, blockOffset, length,
|
|
blockSender = new BlockSender(block, blockOffset, length,
|
|
@@ -594,6 +594,10 @@ class DataXceiver extends Receiver implements Runnable {
|
|
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|
|
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|
|
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
|
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
|
|
|
|
|
|
|
+ // reply to upstream datanode or client
|
|
|
|
+ final DataOutputStream replyOut = getBufferedOutputStream();
|
|
|
|
+ checkAccess(replyOut, isClient, block, blockToken,
|
|
|
|
+ Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
|
|
// check single target for transfer-RBW/Finalized
|
|
// check single target for transfer-RBW/Finalized
|
|
if (isTransfer && targets.length > 0) {
|
|
if (isTransfer && targets.length > 0) {
|
|
throw new IOException(stage + " does not support multiple targets "
|
|
throw new IOException(stage + " does not support multiple targets "
|
|
@@ -624,14 +628,6 @@ class DataXceiver extends Receiver implements Runnable {
|
|
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
|
|
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
|
|
+ localAddress);
|
|
+ localAddress);
|
|
|
|
|
|
- // reply to upstream datanode or client
|
|
|
|
- final DataOutputStream replyOut = new DataOutputStream(
|
|
|
|
- new BufferedOutputStream(
|
|
|
|
- getOutputStream(),
|
|
|
|
- HdfsConstants.SMALL_BUFFER_SIZE));
|
|
|
|
- checkAccess(replyOut, isClient, block, blockToken,
|
|
|
|
- Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
|
|
|
|
-
|
|
|
|
DataOutputStream mirrorOut = null; // stream to next target
|
|
DataOutputStream mirrorOut = null; // stream to next target
|
|
DataInputStream mirrorIn = null; // reply from next target
|
|
DataInputStream mirrorIn = null; // reply from next target
|
|
Socket mirrorSock = null; // socket to next target
|
|
Socket mirrorSock = null; // socket to next target
|
|
@@ -812,13 +808,13 @@ class DataXceiver extends Receiver implements Runnable {
|
|
final String clientName,
|
|
final String clientName,
|
|
final DatanodeInfo[] targets,
|
|
final DatanodeInfo[] targets,
|
|
final StorageType[] targetStorageTypes) throws IOException {
|
|
final StorageType[] targetStorageTypes) throws IOException {
|
|
- checkAccess(socketOut, true, blk, blockToken,
|
|
|
|
- Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
|
|
|
|
previousOpClientName = clientName;
|
|
previousOpClientName = clientName;
|
|
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
|
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
|
|
|
|
|
final DataOutputStream out = new DataOutputStream(
|
|
final DataOutputStream out = new DataOutputStream(
|
|
getOutputStream());
|
|
getOutputStream());
|
|
|
|
+ checkAccess(out, true, blk, blockToken,
|
|
|
|
+ Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
|
|
try {
|
|
try {
|
|
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
|
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
|
targetStorageTypes, clientName);
|
|
targetStorageTypes, clientName);
|
|
@@ -868,6 +864,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
@Override
|
|
@Override
|
|
public void blockChecksum(final ExtendedBlock block,
|
|
public void blockChecksum(final ExtendedBlock block,
|
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
|
|
|
+ updateCurrentThreadName("Getting checksum for block " + block);
|
|
final DataOutputStream out = new DataOutputStream(
|
|
final DataOutputStream out = new DataOutputStream(
|
|
getOutputStream());
|
|
getOutputStream());
|
|
checkAccess(out, true, block, blockToken,
|
|
checkAccess(out, true, block, blockToken,
|
|
@@ -878,13 +875,11 @@ class DataXceiver extends Receiver implements Runnable {
|
|
long visibleLength = datanode.data.getReplicaVisibleLength(block);
|
|
long visibleLength = datanode.data.getReplicaVisibleLength(block);
|
|
boolean partialBlk = requestLength < visibleLength;
|
|
boolean partialBlk = requestLength < visibleLength;
|
|
|
|
|
|
- updateCurrentThreadName("Reading metadata for block " + block);
|
|
|
|
final LengthInputStream metadataIn = datanode.data
|
|
final LengthInputStream metadataIn = datanode.data
|
|
.getMetaDataInputStream(block);
|
|
.getMetaDataInputStream(block);
|
|
|
|
|
|
final DataInputStream checksumIn = new DataInputStream(
|
|
final DataInputStream checksumIn = new DataInputStream(
|
|
new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
|
new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
|
- updateCurrentThreadName("Getting checksum for block " + block);
|
|
|
|
try {
|
|
try {
|
|
//read metadata file
|
|
//read metadata file
|
|
final BlockMetadataHeader header = BlockMetadataHeader
|
|
final BlockMetadataHeader header = BlockMetadataHeader
|
|
@@ -928,20 +923,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|
public void copyBlock(final ExtendedBlock block,
|
|
public void copyBlock(final ExtendedBlock block,
|
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
|
updateCurrentThreadName("Copying block " + block);
|
|
updateCurrentThreadName("Copying block " + block);
|
|
- // Read in the header
|
|
|
|
- if (datanode.isBlockTokenEnabled) {
|
|
|
|
- try {
|
|
|
|
- datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
|
|
|
- BlockTokenSecretManager.AccessMode.COPY);
|
|
|
|
- } catch (InvalidToken e) {
|
|
|
|
- LOG.warn("Invalid access token in request from " + remoteAddress
|
|
|
|
- + " for OP_COPY_BLOCK for block " + block + " : "
|
|
|
|
- + e.getLocalizedMessage());
|
|
|
|
- sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
|
|
+ DataOutputStream reply = getBufferedOutputStream();
|
|
|
|
+ checkAccess(reply, true, block, blockToken,
|
|
|
|
+ Op.COPY_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
|
|
|
|
|
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
|
String msg = "Not able to copy block " + block.getBlockId() + " " +
|
|
String msg = "Not able to copy block " + block.getBlockId() + " " +
|
|
@@ -953,7 +937,6 @@ class DataXceiver extends Receiver implements Runnable {
|
|
}
|
|
}
|
|
|
|
|
|
BlockSender blockSender = null;
|
|
BlockSender blockSender = null;
|
|
- DataOutputStream reply = null;
|
|
|
|
boolean isOpSuccess = true;
|
|
boolean isOpSuccess = true;
|
|
|
|
|
|
try {
|
|
try {
|
|
@@ -961,10 +944,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
|
|
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
|
|
null, CachingStrategy.newDropBehind());
|
|
null, CachingStrategy.newDropBehind());
|
|
|
|
|
|
- // set up response stream
|
|
|
|
OutputStream baseStream = getOutputStream();
|
|
OutputStream baseStream = getOutputStream();
|
|
- reply = new DataOutputStream(new BufferedOutputStream(
|
|
|
|
- baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
|
|
|
|
|
|
|
// send status first
|
|
// send status first
|
|
writeSuccessWithChecksumInfo(blockSender, reply);
|
|
writeSuccessWithChecksumInfo(blockSender, reply);
|
|
@@ -1004,21 +984,12 @@ class DataXceiver extends Receiver implements Runnable {
|
|
final String delHint,
|
|
final String delHint,
|
|
final DatanodeInfo proxySource) throws IOException {
|
|
final DatanodeInfo proxySource) throws IOException {
|
|
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
|
|
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
|
|
|
|
+ DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
|
|
|
+ checkAccess(replyOut, true, block, blockToken,
|
|
|
|
+ Op.REPLACE_BLOCK, BlockTokenSecretManager.AccessMode.REPLACE);
|
|
|
|
|
|
/* read header */
|
|
/* read header */
|
|
block.setNumBytes(dataXceiverServer.estimateBlockSize);
|
|
block.setNumBytes(dataXceiverServer.estimateBlockSize);
|
|
- if (datanode.isBlockTokenEnabled) {
|
|
|
|
- try {
|
|
|
|
- datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
|
|
|
- BlockTokenSecretManager.AccessMode.REPLACE);
|
|
|
|
- } catch (InvalidToken e) {
|
|
|
|
- LOG.warn("Invalid access token in request from " + remoteAddress
|
|
|
|
- + " for OP_REPLACE_BLOCK for block " + block + " : "
|
|
|
|
- + e.getLocalizedMessage());
|
|
|
|
- sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
|
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
|
String msg = "Not able to receive block " + block.getBlockId() +
|
|
String msg = "Not able to receive block " + block.getBlockId() +
|
|
@@ -1035,7 +1006,6 @@ class DataXceiver extends Receiver implements Runnable {
|
|
String errMsg = null;
|
|
String errMsg = null;
|
|
BlockReceiver blockReceiver = null;
|
|
BlockReceiver blockReceiver = null;
|
|
DataInputStream proxyReply = null;
|
|
DataInputStream proxyReply = null;
|
|
- DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
|
|
|
try {
|
|
try {
|
|
// get the output stream to the proxy
|
|
// get the output stream to the proxy
|
|
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
|
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
|
@@ -1135,6 +1105,16 @@ class DataXceiver extends Receiver implements Runnable {
|
|
datanode.metrics.addReplaceBlockOp(elapsed());
|
|
datanode.metrics.addReplaceBlockOp(elapsed());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Separated for testing.
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ DataOutputStream getBufferedOutputStream() {
|
|
|
|
+ return new DataOutputStream(
|
|
|
|
+ new BufferedOutputStream(getOutputStream(),
|
|
|
|
+ HdfsConstants.SMALL_BUFFER_SIZE));
|
|
|
|
+ }
|
|
|
|
+
|
|
private long elapsed() {
|
|
private long elapsed() {
|
|
return now() - opStartTime;
|
|
return now() - opStartTime;
|
|
}
|
|
}
|
|
@@ -1178,11 +1158,51 @@ class DataXceiver extends Receiver implements Runnable {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Wait until the BP is registered, upto the configured amount of time.
|
|
|
|
+ * Throws an exception if times out, which should fail the client request.
|
|
|
|
+ * @param the requested block
|
|
|
|
+ */
|
|
|
|
+ void checkAndWaitForBP(final ExtendedBlock block)
|
|
|
|
+ throws IOException {
|
|
|
|
+ String bpId = block.getBlockPoolId();
|
|
|
|
+
|
|
|
|
+ // The registration is only missing in relatively short time window.
|
|
|
|
+ // Optimistically perform this first.
|
|
|
|
+ try {
|
|
|
|
+ datanode.getDNRegistrationForBP(bpId);
|
|
|
|
+ return;
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ // not registered
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // retry
|
|
|
|
+ long bpReadyTimeout = dnConf.getBpReadyTimeout() * 1000;
|
|
|
|
+ long startTime = Time.monotonicNow();
|
|
|
|
+ while (Time.monotonicNow() - startTime <= bpReadyTimeout) {
|
|
|
|
+ try {
|
|
|
|
+ datanode.getDNRegistrationForBP(bpId);
|
|
|
|
+ return;
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ // not registered
|
|
|
|
+ }
|
|
|
|
+ // sleep before trying again
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ throw new IOException("Interrupted while serving request. Aborting.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // failed to obtain registration.
|
|
|
|
+ throw new IOException("Not ready to serve the block pool, " + bpId + ".");
|
|
|
|
+ }
|
|
|
|
+
|
|
private void checkAccess(OutputStream out, final boolean reply,
|
|
private void checkAccess(OutputStream out, final boolean reply,
|
|
final ExtendedBlock blk,
|
|
final ExtendedBlock blk,
|
|
final Token<BlockTokenIdentifier> t,
|
|
final Token<BlockTokenIdentifier> t,
|
|
final Op op,
|
|
final Op op,
|
|
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
|
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
|
|
|
+ checkAndWaitForBP(blk);
|
|
if (datanode.isBlockTokenEnabled) {
|
|
if (datanode.isBlockTokenEnabled) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Checking block access token for block '" + blk.getBlockId()
|
|
LOG.debug("Checking block access token for block '" + blk.getBlockId()
|