Forráskód Böngészése

commit 370cb8366d95536f6c3e3fac91f7ac0b94bb4776
Author: Jakob Homan <jhoman@yahoo-inc.com>
Date: Wed Oct 14 11:26:44 2009 -0700

HADOOP:5625 from http://issues.apache.org/jira/secure/attachment/12421066/HADOOP-5625-BP-to-20.patch

+++ b/YAHOO-CHANGES.txt
+64. HADOOP-5625. Add operation duration to clienttrace. (Lei Xu
+ via cdouglas)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077015 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 éve
szülő
commit
2dec575d79

+ 6 - 2
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -747,6 +747,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     private synchronized void lastDataNodeRun() {
       long lastHeartbeat = System.currentTimeMillis();
       boolean lastPacket = false;
+      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
 
       while (running && datanode.shouldRun && !lastPacket) {
         long now = System.currentTimeMillis();
@@ -794,6 +795,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             if (pkt.lastPacketInBlock) {
               if (!receiver.finalized) {
                 receiver.close();
+                final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
                 block.setNumBytes(receiver.offsetInBlock);
                 datanode.data.finalizeBlock(block);
                 datanode.myMetrics.blocksWritten.inc();
@@ -805,7 +807,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                   ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                         receiver.inAddr, receiver.myAddr, block.getNumBytes(), 
                         "HDFS_WRITE", receiver.clientName, offset,
-                        datanode.dnRegistration.getStorageID(), block));
+                        datanode.dnRegistration.getStorageID(), block, endTime-startTime));
                 } else {
                   LOG.info("Received block " + block + 
                            " of size " + block.getNumBytes() + 
@@ -843,6 +845,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
 
       boolean lastPacketInBlock = false;
+      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
         try {
@@ -918,6 +921,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             // file and finalize the block before responding success
             if (lastPacketInBlock && !receiver.finalized) {
               receiver.close();
+              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(receiver.offsetInBlock);
               datanode.data.finalizeBlock(block);
               datanode.myMetrics.blocksWritten.inc();
@@ -929,7 +933,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                       receiver.inAddr, receiver.myAddr, block.getNumBytes(), 
                       "HDFS_WRITE", receiver.clientName, offset, 
-                      datanode.dnRegistration.getStorageID(), block));
+                      datanode.dnRegistration.getStorageID(), block, endTime-startTime));
               } else {
                 LOG.info("Received block " + block + 
                          " of size " + block.getNumBytes() + 

+ 3 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -357,6 +357,7 @@ class BlockSender implements java.io.Closeable, FSConstants {
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     
+    final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; 
     try {
       try {
         checksum.writeHeader(out);
@@ -412,7 +413,8 @@ class BlockSender implements java.io.Closeable, FSConstants {
       }
     } finally {
       if (clientTraceFmt != null) {
-        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset));
+        final long endTime = System.nanoTime();
+        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
       }
       close();
     }

+ 3 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -141,7 +141,9 @@ public class DataNode extends Configured
         ", cliID: %s" +  // DFSClient id
         ", offset: %s" + // offset
         ", srvID: %s" +  // DatanodeRegistration
-        ", blockid: %s"; // block id
+        ", blockid: %s" + // block id
+        ", duration: %s"; // duration time
+
   static final Log ClientTraceLog =
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
 

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -164,7 +164,7 @@ class DataXceiver implements Runnable, FSConstants {
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
             "%d", "HDFS_READ", clientName, "%d", 
-            datanode.dnRegistration.getStorageID(), block)
+            datanode.dnRegistration.getStorageID(), block, "%d")
         : datanode.dnRegistration + " Served block " + block + " to " +
             s.getInetAddress();
     try {

+ 7 - 2
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -129,7 +129,8 @@ public class TaskTracker
         ", dest: %s" +  // dst IP
         ", bytes: %s" + // byte count
         ", op: %s" +    // operation
-        ", cliID: %s";  // task id
+        ", cliID: %s" +  // task id
+        ", duration: %s"; // duration
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
@@ -2978,8 +2979,11 @@ public class TaskTracker
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
 
+      long startTime = 0;
       try {
         shuffleMetrics.serverHandlerBusy();
+        if(ClientTraceLog.isInfoEnabled())
+          startTime = System.nanoTime();
         outStream = response.getOutputStream();
         JobConf conf = (JobConf) context.getAttribute("conf");
         LocalDirAllocator lDirAlloc = 
@@ -3071,12 +3075,13 @@ public class TaskTracker
         if (null != mapOutputIn) {
           mapOutputIn.close();
         }
+        final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
         shuffleMetrics.serverHandlerFree();
         if (ClientTraceLog.isInfoEnabled()) {
           ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
                 request.getLocalAddr() + ":" + request.getLocalPort(),
                 request.getRemoteAddr() + ":" + request.getRemotePort(),
-                totalRead, "MAPRED_SHUFFLE", mapId));
+                totalRead, "MAPRED_SHUFFLE", mapId, endTime-startTime));
         }
       }
       outStream.close();