Explorar el Código

HADOOP-5625. Add operation duration to clienttrace. Contributed by Lei Xu

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@767067 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas hace 16 años
padre
commit
d8c85ab788

+ 2 - 0
CHANGES.txt

@@ -246,6 +246,8 @@ Trunk (unreleased changes)
     HADOOP-5697. Change org.apache.hadoop.examples.Grep to use new 
     HADOOP-5697. Change org.apache.hadoop.examples.Grep to use new 
     mapreduce api. (Amareshwari Sriramadasu via sharad)
     mapreduce api. (Amareshwari Sriramadasu via sharad)
 
 
+    HADOOP-5625. Add operation duration to clienttrace. (Lei Xu via cdouglas)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a
     HADOOP-5595. NameNode does not need to run a replicator to choose a

+ 6 - 2
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -747,6 +747,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     private synchronized void lastDataNodeRun() {
     private synchronized void lastDataNodeRun() {
       long lastHeartbeat = System.currentTimeMillis();
       long lastHeartbeat = System.currentTimeMillis();
       boolean lastPacket = false;
       boolean lastPacket = false;
+      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
 
 
       while (running && datanode.shouldRun && !lastPacket) {
       while (running && datanode.shouldRun && !lastPacket) {
         long now = System.currentTimeMillis();
         long now = System.currentTimeMillis();
@@ -794,6 +795,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             if (pkt.lastPacketInBlock) {
             if (pkt.lastPacketInBlock) {
               if (!receiver.finalized) {
               if (!receiver.finalized) {
                 receiver.close();
                 receiver.close();
+                final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
                 block.setNumBytes(receiver.offsetInBlock);
                 block.setNumBytes(receiver.offsetInBlock);
                 datanode.data.finalizeBlock(block);
                 datanode.data.finalizeBlock(block);
                 datanode.myMetrics.blocksWritten.inc();
                 datanode.myMetrics.blocksWritten.inc();
@@ -805,7 +807,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                   ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                   ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                         receiver.inAddr, receiver.myAddr, block.getNumBytes(),
                         receiver.inAddr, receiver.myAddr, block.getNumBytes(),
                         "HDFS_WRITE", receiver.clientName, offset,
                         "HDFS_WRITE", receiver.clientName, offset,
-                        datanode.dnRegistration.getStorageID(), block));
+                        datanode.dnRegistration.getStorageID(), block, endTime-startTime));
                 } else {
                 } else {
                   LOG.info("Received block " + block + 
                   LOG.info("Received block " + block + 
                            " of size " + block.getNumBytes() + 
                            " of size " + block.getNumBytes() + 
@@ -843,6 +845,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
       }
 
 
       boolean lastPacketInBlock = false;
       boolean lastPacketInBlock = false;
+      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
 
         try {
         try {
@@ -918,6 +921,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             // file and finalize the block before responding success
             // file and finalize the block before responding success
             if (lastPacketInBlock && !receiver.finalized) {
             if (lastPacketInBlock && !receiver.finalized) {
               receiver.close();
               receiver.close();
+              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(receiver.offsetInBlock);
               block.setNumBytes(receiver.offsetInBlock);
               datanode.data.finalizeBlock(block);
               datanode.data.finalizeBlock(block);
               datanode.myMetrics.blocksWritten.inc();
               datanode.myMetrics.blocksWritten.inc();
@@ -929,7 +933,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                       receiver.inAddr, receiver.myAddr, block.getNumBytes(),
                       receiver.inAddr, receiver.myAddr, block.getNumBytes(),
                       "HDFS_WRITE", receiver.clientName, offset,
                       "HDFS_WRITE", receiver.clientName, offset,
-                      datanode.dnRegistration.getStorageID(), block));
+                      datanode.dnRegistration.getStorageID(), block, endTime-startTime));
               } else {
               } else {
                 LOG.info("Received block " + block + 
                 LOG.info("Received block " + block + 
                          " of size " + block.getNumBytes() + 
                          " of size " + block.getNumBytes() + 

+ 3 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -357,6 +357,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
     long totalRead = 0;
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     OutputStream streamForSendChunks = out;
     
     
+    final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
     try {
       try {
       try {
         checksum.writeHeader(out);
         checksum.writeHeader(out);
@@ -412,7 +413,8 @@ class BlockSender implements java.io.Closeable, FSConstants {
       }
       }
     } finally {
     } finally {
       if (clientTraceFmt != null) {
       if (clientTraceFmt != null) {
-        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset));
+        final long endTime = System.nanoTime();
+        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
       }
       }
       close();
       close();
     }
     }

+ 3 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -142,7 +142,9 @@ public class DataNode extends Configured
         ", cliID: %s" +  // DFSClient id
         ", cliID: %s" +  // DFSClient id
         ", offset: %s" + // offset
         ", offset: %s" + // offset
         ", srvID: %s" +  // DatanodeRegistration
         ", srvID: %s" +  // DatanodeRegistration
-        ", blockid: %s"; // block id
+        ", blockid: %s" + // block id
+        ", duration: %s";  // duration time
+        
   static final Log ClientTraceLog =
   static final Log ClientTraceLog =
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
 
 

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -164,7 +164,7 @@ class DataXceiver implements Runnable, FSConstants {
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
             "%d", "HDFS_READ", clientName, "%d",
             "%d", "HDFS_READ", clientName, "%d",
-            datanode.dnRegistration.getStorageID(), block)
+            datanode.dnRegistration.getStorageID(), block, "%d")
         : datanode.dnRegistration + " Served block " + block + " to " +
         : datanode.dnRegistration + " Served block " + block + " to " +
             s.getInetAddress();
             s.getInetAddress();
     try {
     try {

+ 7 - 2
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -116,7 +116,8 @@ public class TaskTracker
         ", dest: %s" +  // dst IP
         ", dest: %s" +  // dst IP
         ", bytes: %s" + // byte count
         ", bytes: %s" + // byte count
         ", op: %s" +    // operation
         ", op: %s" +    // operation
-        ", cliID: %s";  // task id
+        ", cliID: %s" + // task id
+        ", duration: %s"; // duration
   public static final Log ClientTraceLog =
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
 
@@ -2960,8 +2961,11 @@ public class TaskTracker
       TaskTracker tracker = 
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
         (TaskTracker) context.getAttribute("task.tracker");
 
 
+      long startTime = 0;
       try {
       try {
         shuffleMetrics.serverHandlerBusy();
         shuffleMetrics.serverHandlerBusy();
+        if(ClientTraceLog.isInfoEnabled())
+          startTime = System.nanoTime();
         outStream = response.getOutputStream();
         outStream = response.getOutputStream();
         JobConf conf = (JobConf) context.getAttribute("conf");
         JobConf conf = (JobConf) context.getAttribute("conf");
         LocalDirAllocator lDirAlloc = 
         LocalDirAllocator lDirAlloc = 
@@ -3045,12 +3049,13 @@ public class TaskTracker
         if (null != mapOutputIn) {
         if (null != mapOutputIn) {
           mapOutputIn.close();
           mapOutputIn.close();
         }
         }
+        final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
         shuffleMetrics.serverHandlerFree();
         shuffleMetrics.serverHandlerFree();
         if (ClientTraceLog.isInfoEnabled()) {
         if (ClientTraceLog.isInfoEnabled()) {
           ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
           ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
                 request.getLocalAddr() + ":" + request.getLocalPort(),
                 request.getLocalAddr() + ":" + request.getLocalPort(),
                 request.getRemoteAddr() + ":" + request.getRemotePort(),
                 request.getRemoteAddr() + ":" + request.getRemotePort(),
-                totalRead, "MAPRED_SHUFFLE", mapId));
+                totalRead, "MAPRED_SHUFFLE", mapId, endTime-startTime));
         }
         }
       }
       }
       outStream.close();
       outStream.close();