|
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
-import org.apache.hadoop.metrics.MetricsUtil;
|
|
|
import org.apache.hadoop.net.DNS;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
@@ -36,6 +35,7 @@ import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.dfs.BlockCommand;
|
|
|
import org.apache.hadoop.dfs.DatanodeProtocol;
|
|
|
import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
|
|
|
+import org.apache.hadoop.dfs.datanode.metrics.DataNodeMetrics;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.net.*;
|
|
@@ -44,11 +44,6 @@ import java.util.concurrent.Semaphore;
|
|
|
import java.security.NoSuchAlgorithmException;
|
|
|
import java.security.SecureRandom;
|
|
|
|
|
|
-import org.apache.hadoop.metrics.MetricsContext;
|
|
|
-import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
-import org.apache.hadoop.metrics.Updater;
|
|
|
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
|
|
-
|
|
|
/**********************************************************
|
|
|
* DataNode is a class (and program) that stores a set of
|
|
|
* blocks for a DFS deployment. A single deployment can
|
|
@@ -150,88 +145,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
return System.currentTimeMillis();
|
|
|
}
|
|
|
|
|
|
- static class DataNodeMetrics implements Updater {
|
|
|
- private final MetricsRecord metricsRecord;
|
|
|
- private int bytesWritten = 0;
|
|
|
- private int bytesRead = 0;
|
|
|
- private int blocksWritten = 0;
|
|
|
- private int blocksRead = 0;
|
|
|
- private int blocksReplicated = 0;
|
|
|
- private int blocksRemoved = 0;
|
|
|
- private int blocksVerified = 0;
|
|
|
- private int blockVerificationFailures = 0;
|
|
|
-
|
|
|
- DataNodeMetrics(Configuration conf) {
|
|
|
- String sessionId = conf.get("session.id");
|
|
|
- // Initiate reporting of Java VM metrics
|
|
|
- JvmMetrics.init("DataNode", sessionId);
|
|
|
- // Create record for DataNode metrics
|
|
|
- MetricsContext context = MetricsUtil.getContext("dfs");
|
|
|
- metricsRecord = MetricsUtil.createRecord(context, "datanode");
|
|
|
- metricsRecord.setTag("sessionId", sessionId);
|
|
|
- context.registerUpdater(this);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Since this object is a registered updater, this method will be called
|
|
|
- * periodically, e.g. every 5 seconds.
|
|
|
- */
|
|
|
- public void doUpdates(MetricsContext unused) {
|
|
|
- synchronized (this) {
|
|
|
- metricsRecord.incrMetric("bytes_read", bytesRead);
|
|
|
- metricsRecord.incrMetric("bytes_written", bytesWritten);
|
|
|
- metricsRecord.incrMetric("blocks_read", blocksRead);
|
|
|
- metricsRecord.incrMetric("blocks_written", blocksWritten);
|
|
|
- metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
|
|
|
- metricsRecord.incrMetric("blocks_removed", blocksRemoved);
|
|
|
- metricsRecord.incrMetric("blocks_verified", blocksVerified);
|
|
|
- metricsRecord.incrMetric("block_verification_failures",
|
|
|
- blockVerificationFailures);
|
|
|
-
|
|
|
- bytesWritten = 0;
|
|
|
- bytesRead = 0;
|
|
|
- blocksWritten = 0;
|
|
|
- blocksRead = 0;
|
|
|
- blocksReplicated = 0;
|
|
|
- blocksRemoved = 0;
|
|
|
- blocksVerified = 0;
|
|
|
- blockVerificationFailures = 0;
|
|
|
- }
|
|
|
- metricsRecord.update();
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void readBytes(int nbytes) {
|
|
|
- bytesRead += nbytes;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void wroteBytes(int nbytes) {
|
|
|
- bytesWritten += nbytes;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void readBlocks(int nblocks) {
|
|
|
- blocksRead += nblocks;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void wroteBlocks(int nblocks) {
|
|
|
- blocksWritten += nblocks;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void replicatedBlocks(int nblocks) {
|
|
|
- blocksReplicated += nblocks;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void removedBlocks(int nblocks) {
|
|
|
- blocksRemoved += nblocks;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void verifiedBlocks(int nblocks) {
|
|
|
- blocksVerified += nblocks;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void verificationFailures(int failures) {
|
|
|
- blockVerificationFailures += failures;
|
|
|
- }
|
|
|
- }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Create the DataNode given a configuration and an array of dataDirs.
|
|
@@ -240,7 +156,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
DataNode(Configuration conf,
|
|
|
AbstractList<File> dataDirs) throws IOException {
|
|
|
|
|
|
- myMetrics = new DataNodeMetrics(conf);
|
|
|
datanodeObject = this;
|
|
|
|
|
|
try {
|
|
@@ -301,7 +216,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
setNewStorageID(dnRegistration);
|
|
|
dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
|
|
|
dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
|
|
|
-
|
|
|
+ // it would have been better to pass storage as a parameter to
|
|
|
+ // constructor below - need to augment ReflectionUtils used below.
|
|
|
+ conf.set("StorageId", dnRegistration.getStorageID());
|
|
|
try {
|
|
|
//Equivalent of following (can't do because Simulated is in test dir)
|
|
|
// this.data = new SimulatedFSDataset(conf);
|
|
@@ -318,6 +235,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// initialize data node internal structure
|
|
|
this.data = new FSDataset(storage, conf);
|
|
|
}
|
|
|
+
|
|
|
|
|
|
// find free port
|
|
|
ServerSocket ss = new ServerSocket(tmpPort, 0, socAddr.getAddress());
|
|
@@ -384,6 +302,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
networkLoc = getNetworkLoc(conf);
|
|
|
// register datanode
|
|
|
register();
|
|
|
+ myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
|
|
|
}
|
|
|
|
|
|
private NamespaceInfo handshake() throws IOException {
|
|
@@ -578,6 +497,12 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
} catch (InterruptedException ie) {
|
|
|
}
|
|
|
}
|
|
|
+ if (data != null) {
|
|
|
+ data.shutdown();
|
|
|
+ }
|
|
|
+ if (myMetrics != null) {
|
|
|
+ myMetrics.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
@@ -638,12 +563,13 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
while (shouldRun) {
|
|
|
try {
|
|
|
- long now = System.currentTimeMillis();
|
|
|
+ long startTime = now();
|
|
|
|
|
|
//
|
|
|
// Every so often, send heartbeat or block-report
|
|
|
//
|
|
|
- if (now - lastHeartbeat > heartBeatInterval) {
|
|
|
+
|
|
|
+ if (startTime - lastHeartbeat > heartBeatInterval) {
|
|
|
//
|
|
|
// All heartbeat messages include following info:
|
|
|
// -- Datanode name
|
|
@@ -657,8 +583,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
data.getRemaining(),
|
|
|
xmitsInProgress,
|
|
|
xceiverCount.getValue());
|
|
|
+ myMetrics.heartbeats.inc(now() - startTime);
|
|
|
//LOG.info("Just sent heartbeat, with name " + localName);
|
|
|
- lastHeartbeat = now;
|
|
|
+ lastHeartbeat = startTime;
|
|
|
if (!processCommand(cmd))
|
|
|
continue;
|
|
|
}
|
|
@@ -697,7 +624,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
|
|
|
// send block report
|
|
|
- if (now - lastBlockReport > blockReportInterval) {
|
|
|
+ if (startTime - lastBlockReport > blockReportInterval) {
|
|
|
//
|
|
|
// Send latest blockinfo report if timer has expired.
|
|
|
// Get back a list of local block(s) that are obsolete
|
|
@@ -708,6 +635,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
DatanodeCommand cmd = namenode.blockReport(dnRegistration,
|
|
|
BlockListAsLongs.convertToArrayLongs(bReport));
|
|
|
long brTime = now() - brStartTime;
|
|
|
+ myMetrics.blockReports.inc(brTime);
|
|
|
LOG.info("BlockReport of " + bReport.length +
|
|
|
" blocks got processed in " + brTime + " msecs");
|
|
|
//
|
|
@@ -715,10 +643,10 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// time before we start the periodic block reports.
|
|
|
//
|
|
|
if (resetBlockReportTime) {
|
|
|
- lastBlockReport = now - new Random().nextInt((int)(blockReportInterval));
|
|
|
+ lastBlockReport = startTime - new Random().nextInt((int)(blockReportInterval));
|
|
|
resetBlockReportTime = false;
|
|
|
} else {
|
|
|
- lastBlockReport = now;
|
|
|
+ lastBlockReport = startTime;
|
|
|
}
|
|
|
processCommand(cmd);
|
|
|
}
|
|
@@ -784,7 +712,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
checkDiskError();
|
|
|
throw e;
|
|
|
}
|
|
|
- myMetrics.removedBlocks(toDelete.length);
|
|
|
+ myMetrics.blocksRemoved.inc(toDelete.length);
|
|
|
break;
|
|
|
case DatanodeProtocol.DNA_SHUTDOWN:
|
|
|
// shut down the data node
|
|
@@ -981,24 +909,37 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
if ( version != DATA_TRANFER_VERSION ) {
|
|
|
throw new IOException( "Version Mismatch" );
|
|
|
}
|
|
|
-
|
|
|
+ boolean local = s.getInetAddress().equals(s.getLocalAddress());
|
|
|
byte op = in.readByte();
|
|
|
-
|
|
|
+ long startTime = now();
|
|
|
switch ( op ) {
|
|
|
case OP_READ_BLOCK:
|
|
|
readBlock( in );
|
|
|
+ myMetrics.readBlockOp.inc(now() - startTime);
|
|
|
+ if (local)
|
|
|
+ myMetrics.readsFromLocalClient.inc();
|
|
|
+ else
|
|
|
+ myMetrics.readsFromRemoteClient.inc();
|
|
|
break;
|
|
|
case OP_WRITE_BLOCK:
|
|
|
writeBlock( in );
|
|
|
+ myMetrics.writeBlockOp.inc(now() - startTime);
|
|
|
+ if (local)
|
|
|
+ myMetrics.writesFromLocalClient.inc();
|
|
|
+ else
|
|
|
+ myMetrics.writesFromRemoteClient.inc();
|
|
|
break;
|
|
|
case OP_READ_METADATA:
|
|
|
readMetadata( in );
|
|
|
+ myMetrics.readMetadataOp.inc(now() - startTime);
|
|
|
break;
|
|
|
case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
|
|
|
replaceBlock(in);
|
|
|
+ myMetrics.replaceBlockOp.inc(now() - startTime);
|
|
|
break;
|
|
|
case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source
|
|
|
copyBlock(in);
|
|
|
+ myMetrics.copyBlockOp.inc(now() - startTime);
|
|
|
break;
|
|
|
default:
|
|
|
throw new IOException("Unknown opcode " + op + " in data stream");
|
|
@@ -1056,12 +997,12 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
} catch (IOException ignored) {}
|
|
|
}
|
|
|
|
|
|
- myMetrics.readBytes((int) read);
|
|
|
- myMetrics.readBlocks(1);
|
|
|
+ myMetrics.bytesRead.inc((int) read);
|
|
|
+ myMetrics.blocksRead.inc();
|
|
|
LOG.info(dnRegistration + " Served block " + block + " to " + s.getInetAddress());
|
|
|
} catch ( SocketException ignored ) {
|
|
|
// Its ok for remote side to close the connection anytime.
|
|
|
- myMetrics.readBlocks(1);
|
|
|
+ myMetrics.blocksRead.inc();
|
|
|
} catch ( IOException ioe ) {
|
|
|
/* What exactly should we do here?
|
|
|
* Earlier version shutdown() datanode if there is disk error.
|
|
@@ -1301,8 +1242,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// then send data
|
|
|
long read = blockSender.sendBlock(targetOut, balancingThrottler);
|
|
|
|
|
|
- myMetrics.readBytes((int) read);
|
|
|
- myMetrics.readBlocks(1);
|
|
|
+ myMetrics.bytesRead.inc((int) read);
|
|
|
+ myMetrics.blocksRead.inc();
|
|
|
|
|
|
// check the response from target
|
|
|
receiveResponse(targetSock, 1);
|
|
@@ -1807,7 +1748,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
receiver.close();
|
|
|
block.setNumBytes(receiver.offsetInBlock);
|
|
|
data.finalizeBlock(block);
|
|
|
- myMetrics.wroteBlocks(1);
|
|
|
+ myMetrics.blocksWritten.inc();
|
|
|
notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
|
|
|
LOG.info("Received block " + block +
|
|
|
" of size " + block.getNumBytes() +
|
|
@@ -1891,7 +1832,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
receiver.close();
|
|
|
block.setNumBytes(receiver.offsetInBlock);
|
|
|
data.finalizeBlock(block);
|
|
|
- myMetrics.wroteBlocks(1);
|
|
|
+ myMetrics.blocksWritten.inc();
|
|
|
notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
|
|
|
LOG.info("Received block " + block +
|
|
|
" of size " + block.getNumBytes() +
|
|
@@ -2129,7 +2070,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
out.write(buf, 0, len);
|
|
|
// Write checksum
|
|
|
checksumOut.write(buf, len, checksumSize);
|
|
|
- myMetrics.wroteBytes(len);
|
|
|
+ myMetrics.bytesWritten.inc(len);
|
|
|
}
|
|
|
} catch (IOException iex) {
|
|
|
checkDiskError(iex);
|
|
@@ -2295,7 +2236,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// Finalize the block. Does this fsync()?
|
|
|
block.setNumBytes(offsetInBlock);
|
|
|
data.finalizeBlock(block);
|
|
|
- myMetrics.wroteBlocks(1);
|
|
|
+ myMetrics.blocksWritten.inc();
|
|
|
}
|
|
|
|
|
|
} catch (IOException ioe) {
|