|
@@ -86,6 +86,7 @@ import org.apache.hadoop.util.DataChecksum;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.protobuf.ByteString;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
|
|
|
|
|
|
/**
|
|
@@ -480,7 +481,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
final boolean sendChecksum,
|
|
|
final CachingStrategy cachingStrategy) throws IOException {
|
|
|
previousOpClientName = clientName;
|
|
|
-
|
|
|
+ long read = 0;
|
|
|
OutputStream baseStream = getOutputStream();
|
|
|
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
|
|
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
|
@@ -515,8 +516,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
// send op status
|
|
|
writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
|
|
|
|
|
|
- long read = blockSender.sendBlock(out, baseStream, null); // send data
|
|
|
-
|
|
|
+ long beginRead = Time.monotonicNow();
|
|
|
+ read = blockSender.sendBlock(out, baseStream, null); // send data
|
|
|
+ long duration = Time.monotonicNow() - beginRead;
|
|
|
if (blockSender.didSendEntireByteRange()) {
|
|
|
// If we sent the entire range, then we should expect the client
|
|
|
// to respond with a Status enum.
|
|
@@ -539,6 +541,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
}
|
|
|
datanode.metrics.incrBytesRead((int) read);
|
|
|
datanode.metrics.incrBlocksRead();
|
|
|
+ datanode.metrics.incrTotalReadTime(duration);
|
|
|
} catch ( SocketException ignored ) {
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " +
|
|
@@ -563,7 +566,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
|
|
|
//update metrics
|
|
|
datanode.metrics.addReadBlockOp(elapsed());
|
|
|
- datanode.metrics.incrReadsFromClient(peer.isLocal());
|
|
|
+ datanode.metrics.incrReadsFromClient(peer.isLocal(), read);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -590,7 +593,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
final boolean isClient = !isDatanode;
|
|
|
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|
|
|
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
|
|
-
|
|
|
+ long size = 0;
|
|
|
// check single target for transfer-RBW/Finalized
|
|
|
if (isTransfer && targets.length > 0) {
|
|
|
throw new IOException(stage + " does not support multiple targets "
|
|
@@ -796,7 +799,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
+ localAddress + " of size " + block.getNumBytes());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ if(isClient) {
|
|
|
+ size = block.getNumBytes();
|
|
|
+ }
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.info("opWriteBlock " + block + " received exception " + ioe);
|
|
|
incrDatanodeNetworkErrors();
|
|
@@ -813,7 +818,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
|
|
|
//update metrics
|
|
|
datanode.metrics.addWriteBlockOp(elapsed());
|
|
|
- datanode.metrics.incrWritesFromClient(peer.isLocal());
|
|
|
+ datanode.metrics.incrWritesFromClient(peer.isLocal(), size);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -993,12 +998,15 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
|
|
|
// send status first
|
|
|
writeSuccessWithChecksumInfo(blockSender, reply);
|
|
|
+
|
|
|
+ long beginRead = Time.monotonicNow();
|
|
|
// send block content to the target
|
|
|
- long read = blockSender.sendBlock(reply, baseStream,
|
|
|
+ long read = blockSender.sendBlock(reply, baseStream,
|
|
|
dataXceiverServer.balanceThrottler);
|
|
|
-
|
|
|
+ long duration = Time.monotonicNow() - beginRead;
|
|
|
datanode.metrics.incrBytesRead((int) read);
|
|
|
datanode.metrics.incrBlocksRead();
|
|
|
+ datanode.metrics.incrTotalReadTime(duration);
|
|
|
|
|
|
LOG.info("Copied " + block + " to " + peer.getRemoteAddressString());
|
|
|
} catch (IOException ioe) {
|