Browse Source

HADOOP-2018. The source datanode of a data transfer waits for
a response from the target datanode before closing the data stream.
(Hairong Kuang via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@583958 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 18 years ago
parent
commit
173b9c1b28
2 changed files with 64 additions and 33 deletions
  1. 4 0
      CHANGES.txt
  2. 60 33
      src/java/org/apache/hadoop/dfs/DataNode.java

+ 4 - 0
CHANGES.txt

@@ -276,6 +276,10 @@ Trunk (unreleased changes)
     properly i.e. the map TIP is incorrectly left marked as 'complete' and it
     properly i.e. the map TIP is incorrectly left marked as 'complete' and it
     is never rescheduled elsewhere, leading to hung reduces.
     is never rescheduled elsewhere, leading to hung reduces.
     (Devaraj Das via acmurthy)
     (Devaraj Das via acmurthy)
+
+    HADOOP-2018. The source datanode of a data transfer waits for
+    a response from the target datanode before closing the data stream.
+    (Hairong Kuang via dhruba)
                                 
                                 
   IMPROVEMENTS
   IMPROVEMENTS
 
 

+ 60 - 33
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -692,7 +692,48 @@ public class DataNode implements FSConstants, Runnable {
       }
       }
     }
     }
   }
   }
-    
+
+  /* utility function for receiving a response */
+  private static void receiveResponse(Socket s) throws IOException {
+    // check the response
+    DataInputStream reply = new DataInputStream(new BufferedInputStream(
+        s.getInputStream(), BUFFER_SIZE));
+    try {
+      short opStatus = reply.readShort();
+      if(opStatus != OP_STATUS_SUCCESS) {
+        throw new IOException("operation failed at "+
+            s.getInetAddress());
+      } 
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+
+  /* utility function for sending a respose */
+  private static void sendResponse(Socket s, short opStatus) throws IOException {
+    DataOutputStream reply = new DataOutputStream(s.getOutputStream());
+    try {
+      reply.writeShort(opStatus);
+      reply.flush();
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+
+  /*
+   * Informing the name node could take a long long time! Should we wait
+   * till namenode is informed before responding with success to the
+   * client? For now we don't.
+   */
+  private void notifyNamenodeReceivedBlock(Block block) {
+    synchronized (receivedBlockList) {
+      receivedBlockList.add(block);
+      receivedBlockList.notifyAll();
+    }
+  }
+
+
+
   /**
   /**
    * Server used for receiving/sending a block of data.
    * Server used for receiving/sending a block of data.
    * This is created to listen for requests from clients or 
    * This is created to listen for requests from clients or 
@@ -854,10 +895,11 @@ public class DataNode implements FSConstants, Runnable {
       DataOutputStream mirrorOut = null;  // stream to next target
       DataOutputStream mirrorOut = null;  // stream to next target
       Socket mirrorSock = null;           // socket to next target
       Socket mirrorSock = null;           // socket to next target
       BlockReceiver blockReceiver = null; // responsible for data handling
       BlockReceiver blockReceiver = null; // responsible for data handling
+      String mirrorNode = null;           // the name:port of next target
       try {
       try {
         // open a block receiver and check if the block does not exist
         // open a block receiver and check if the block does not exist
         blockReceiver = new BlockReceiver(block, in, 
         blockReceiver = new BlockReceiver(block, in, 
-            s.getRemoteSocketAddress().toString());
+            s.getInetAddress().toString());
 
 
         //
         //
         // Open network conn to backup machine, if 
         // Open network conn to backup machine, if 
@@ -865,7 +907,6 @@ public class DataNode implements FSConstants, Runnable {
         //
         //
         if (targets.length > 0) {
         if (targets.length > 0) {
           InetSocketAddress mirrorTarget = null;
           InetSocketAddress mirrorTarget = null;
-          String mirrorNode = null;
           // Connect to backup machine
           // Connect to backup machine
           mirrorNode = targets[0].getName();
           mirrorNode = targets[0].getName();
           mirrorTarget = createSocketAddr(mirrorNode);
           mirrorTarget = createSocketAddr(mirrorNode);
@@ -892,40 +933,25 @@ public class DataNode implements FSConstants, Runnable {
           }
           }
         }
         }
 
 
-        String mirrorAddr = (mirrorSock == null) ? null : 
-            mirrorSock.getRemoteSocketAddress().toString();
+        // receive the block and mirror to the next target
+        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
         blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
 
 
-        /*
-         * Informing the name node could take a long long time! Should we wait
-         * till namenode is informed before responding with success to the
-         * client? For now we don't.
-         */
-        synchronized (receivedBlockList) {
-          receivedBlockList.add(block);
-          receivedBlockList.notifyAll();
-        }
+        // notify name node
+        notifyNamenodeReceivedBlock(block);
 
 
         String msg = "Received block " + block + " from " +
         String msg = "Received block " + block + " from " +
-                     s.getRemoteSocketAddress();
+                     s.getInetAddress();
 
 
         /* read response from next target in the pipeline. 
         /* read response from next target in the pipeline. 
          * ignore the response for now. Will fix it in HADOOP-1927
          * ignore the response for now. Will fix it in HADOOP-1927
          */
          */
         if( mirrorSock != null ) {
         if( mirrorSock != null ) {
-          short result = OP_STATUS_ERROR;
-          DataInputStream mirrorIn = null;
           try {
           try {
-            mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
-            result = mirrorIn.readShort();
+            receiveResponse(mirrorSock);
           } catch (IOException ignored) {
           } catch (IOException ignored) {
-          } finally {
-            IOUtils.closeStream(mirrorIn);
-          }
-
-          msg += " and " +  (( result != OP_STATUS_SUCCESS ) ?
-                               "failed to mirror to " : " mirrored to ") +
-                 mirrorAddr;
+            msg += " and " +  ignored.getMessage();
+          } 
         }
         }
 
 
         LOG.info(msg);
         LOG.info(msg);
@@ -934,17 +960,14 @@ public class DataNode implements FSConstants, Runnable {
         throw ioe;
         throw ioe;
       } finally {
       } finally {
         // send back reply
         // send back reply
-        DataOutputStream reply = new DataOutputStream(s.getOutputStream());
         try {
         try {
-          reply.writeShort(opStatus);
-          reply.flush();
+          sendResponse(s, opStatus);
         } catch (IOException ioe) {
         } catch (IOException ioe) {
-          LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()
-              + "for writing block " + block );
-          LOG.warn(StringUtils.stringifyException(ioe));
+          LOG.warn("Error writing reply back to " + s.getInetAddress() +
+              " for writing block " + block +"\n" +
+              StringUtils.stringifyException(ioe));
         }
         }
         // close all opened streams
         // close all opened streams
-        IOUtils.closeStream(reply);
         IOUtils.closeStream(mirrorOut);
         IOUtils.closeStream(mirrorOut);
         IOUtils.closeSocket(mirrorSock);
         IOUtils.closeSocket(mirrorSock);
         IOUtils.closeStream(blockReceiver);
         IOUtils.closeStream(blockReceiver);
@@ -1435,6 +1458,10 @@ public class DataNode implements FSConstants, Runnable {
         }
         }
         // send data & checksum
         // send data & checksum
         blockSender.sendBlock(out, null);
         blockSender.sendBlock(out, null);
+        
+        // check the response
+        receiveResponse(sock);
+
         LOG.info("Transmitted block " + b + " to " + curTarget);
         LOG.info("Transmitted block " + b + " to " + curTarget);
       } catch (IOException ie) {
       } catch (IOException ie) {
         LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()
         LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()