Browse Source

Reverted changes from 384385, which removed local backup copy of block & removed most timeouts. That worked well when all hosts are healthy, but when a few are very slow it caused too many tasks to timeout and loads to balloon on slow hosts. So the local backup is back, but no longer in /tmp, rather in dfs.data.dir, and timeouts are back. I also added connect timeouts, so that dfs connects also don't get hung up by slow hosts.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@384665 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
a2619cc401

+ 20 - 0
src/java/org/apache/hadoop/conf/Configuration.java

@@ -263,6 +263,26 @@ public class Configuration {
     set(propertyName, theClass.getName());
   }
 
+  /** Returns a file name under a directory named in <i>dirsProp</i> with the
+   * given <i>path</i>.  If <i>dirsProp</i> contains multiple directories, then
+   * one is chosen based on <i>path</i>'s hash code.  If the selected directory
+   * does not exist, an attempt is made to create it.
+   */
+  public File getFile(String dirsProp, String path) throws IOException {
+    String[] dirs = getStrings(dirsProp);
+    int hashCode = path.hashCode();
+    for (int i = 0; i < dirs.length; i++) {  // try each local dir
+      int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
+      File file = new File(dirs[index], path);
+      File dir = file.getParentFile();
+      if (dir.exists() || dir.mkdirs()) {
+        return file;
+      }
+    }
+    throw new IOException("No valid local directories in property: "+dirsProp);
+  }
+
+
   /** Returns the URL for the named resource. */
   public URL getResource(String name) {
     return classLoader.getResource(name);

+ 115 - 46
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -47,12 +47,13 @@ class DFSClient implements FSConstants {
     Random r = new Random();
     String clientName;
     Daemon leaseChecker;
-
+    private Configuration conf;
 
     /** 
      * Create a new DFSClient connected to the given namenode server.
      */
     public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) {
+        this.conf = conf;
         this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf);
         try {
             this.localName = InetAddress.getLocalHost().getHostName();
@@ -255,6 +256,7 @@ class DFSClient implements FSConstants {
      * negotiation of the namenode and various datanodes as necessary.
      ****************************************************************/
     class DFSInputStream extends FSInputStream {
+        private Socket s = null;
         boolean closed = false;
 
         private String src;
@@ -316,9 +318,9 @@ class DFSClient implements FSConstants {
                 throw new IOException("Attempted to read past end of file");
             }
 
-            if (blockStream != null) {
-                blockStream.close();
-                partnerStream.close();
+            if (s != null) {
+                s.close();
+                s = null;
             }
 
             //
@@ -348,7 +350,6 @@ class DFSClient implements FSConstants {
             //
             int failures = 0;
             InetSocketAddress targetAddr = null;
-            Socket s = null;
             TreeSet deadNodes = new TreeSet();
             while (s == null) {
                 DatanodeInfo chosenNode;
@@ -376,8 +377,9 @@ class DFSClient implements FSConstants {
                     continue;
                 }
                 try {
-                    s = new Socket(targetAddr.getAddress(), targetAddr.getPort());
-                    //s.setSoTimeout(READ_TIMEOUT);
+                    s = new Socket();
+                    s.connect(targetAddr, READ_TIMEOUT);
+                    s.setSoTimeout(READ_TIMEOUT);
 
                     //
                     // Xmit header info to datanode
@@ -428,10 +430,10 @@ class DFSClient implements FSConstants {
                 throw new IOException("Stream closed");
             }
 
-            if (blockStream != null) {
+            if (s != null) {
                 blockStream.close();
-                blockStream = null;
-                partnerStream.close();
+                s.close();
+                s = null;
             }
             super.close();
             closed = true;
@@ -520,6 +522,7 @@ class DFSClient implements FSConstants {
      * DFSOutputStream creates files from a stream of bytes.
      ****************************************************************/
     class DFSOutputStream extends FSOutputStream {
+        private Socket s;
         boolean closed = false;
 
         private byte outBuf[] = new byte[BUFFER_SIZE];
@@ -528,8 +531,11 @@ class DFSClient implements FSConstants {
         private UTF8 src;
         boolean closingDown = false;
         private boolean overwrite;
+        private boolean blockStreamWorking;
         private DataOutputStream blockStream;
         private DataInputStream blockReplyStream;
+        private File backupFile;
+        private OutputStream backupStream;
         private Block block;
         private DatanodeInfo targets[]; 
         private long filePos = 0;
@@ -543,16 +549,31 @@ class DFSClient implements FSConstants {
             this.overwrite = overwrite;
             this.blockStream = null;
             this.blockReplyStream = null;
+            this.blockStreamWorking = false;
+            this.backupFile = newBackupFile();
 
+            this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
             nextBlockOutputStream(true);
         }
 
+        private File newBackupFile() throws IOException {
+          return conf.getFile("dfs.data.dir",
+                              "tmp"+File.separator+
+                              "client-"+Math.abs(r.nextLong()));
+        }
+
         /**
          * Open a DataOutputStream to a DataNode so that it can be written to.
          * This happens when a file is created and each time a new block is allocated.
          * Must get block ID and the IDs of the destinations from the namenode.
          */
         private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
+            if (! firstTime && blockStreamWorking) {
+                blockStream.flush();
+                s.close();
+                blockStreamWorking = false;
+            }
+
             boolean retry = false;
             long start = System.currentTimeMillis();
             do {
@@ -588,10 +609,10 @@ class DFSClient implements FSConstants {
                 // Connect to first DataNode in the list.  Abort if this fails.
                 //
                 InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());
-                Socket s = null;
                 try {
-                    s = new Socket(target.getAddress(), target.getPort());
-                    //s.setSoTimeout(READ_TIMEOUT);
+                    s = new Socket();
+                    s.connect(target, READ_TIMEOUT);
+                    s.setSoTimeout(READ_TIMEOUT);
                 } catch (IOException ie) {
                     // Connection failed.  Let's wait a little bit and retry
                     try {
@@ -625,6 +646,7 @@ class DFSClient implements FSConstants {
                 bytesWrittenToBlock = 0;
                 blockStream = out;
                 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+                blockStreamWorking = true;
             } while (retry);
         }
 
@@ -705,21 +727,18 @@ class DFSClient implements FSConstants {
                 //
                 // To the blockStream, write length, then bytes
                 //
-                try {
-                  blockStream.writeLong(workingPos);
-                  blockStream.write(outBuf, 0, workingPos);
-                } catch (IOException ie) {
-                  try {
-                    blockStream.close();
-                  } catch (IOException ie2) {
-                  }
-                  try {
-                    blockReplyStream.close();
-                  } catch (IOException ie2) {
-                  }
-                  namenode.abandonBlock(block, src.toString());
-                  throw ie;
+                if (blockStreamWorking) {
+                    try {
+                        blockStream.writeLong(workingPos);
+                        blockStream.write(outBuf, 0, workingPos);
+                    } catch (IOException ie) {
+                        handleSocketException(ie);
+                    }
                 }
+                //
+                // To the local block backup, write just the bytes
+                //
+                backupStream.write(outBuf, 0, workingPos);
 
                 //
                 // Track position
@@ -734,20 +753,64 @@ class DFSClient implements FSConstants {
          * We're done writing to the current block.
          */
         private synchronized void endBlock() throws IOException {
-            try {
-              internalClose();
-            } catch (IOException ie) {
-              namenode.abandonBlock(block, src.toString());
-              throw ie;
+            boolean mustRecover = ! blockStreamWorking;
+
+            //
+            // A zero-length set of data indicates the end of the block
+            //
+            if (blockStreamWorking) {
+                try {
+                    internalClose();
+                } catch (IOException ie) {
+                    handleSocketException(ie);
+                    mustRecover = true;
+                } finally {
+                    blockStreamWorking = false;
+                }
             }
+
+            //
+            // Done with local copy
+            //
+            backupStream.close();
+
+            //
+            // If necessary, recover from a failed datanode connection.
+            //
+            while (mustRecover) {
+                nextBlockOutputStream(false);
+                InputStream in = new FileInputStream(backupFile);
+                try {
+                    byte buf[] = new byte[BUFFER_SIZE];
+                    int bytesRead = in.read(buf);
+                    while (bytesRead >= 0) {
+                        blockStream.writeLong((long) bytesRead);
+                        blockStream.write(buf, 0, bytesRead);
+                        bytesRead = in.read(buf);
+                    }
+                    internalClose();
+                    LOG.info("Recovered from failed datanode connection");
+                    mustRecover = false;
+                } catch (IOException ie) {
+                    handleSocketException(ie);
+                } finally {
+                  in.close();
+                }
+            }
+
+            //
+            // Delete local backup, start new one
+            //
+            backupFile.delete();
+            backupFile = newBackupFile();
+            backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
         }
 
         /**
-         * Close down stream to remote datanode.
+         * Close down stream to remote datanode.  Called from two places
+         * in endBlock();
          */
         private synchronized void internalClose() throws IOException {
-          try {
-            // A zero-length set of data indicates the end of the block
             blockStream.writeLong(0);
             blockStream.flush();
 
@@ -761,16 +824,18 @@ class DFSClient implements FSConstants {
             lb.readFields(blockReplyStream);
             namenode.reportWrittenBlock(lb);
 
-          } finally {
-            try {
-              blockStream.close();
-            } catch (IOException ie2) {
-            }
-            try {
-              blockReplyStream.close();
-            } catch (IOException ie2) {
-            }
+            s.close();
+        }
+
+        private void handleSocketException(IOException ie) throws IOException {
+          LOG.log(Level.WARNING, "Error while writing.", ie);
+          try {
+            s.close();
+          } catch (IOException ie2) {
+            LOG.log(Level.WARNING, "Error closing socket.", ie2);
           }
+          blockStreamWorking = false;
+          namenode.abandonBlock(block, src.toString());
         }
 
         /**
@@ -786,9 +851,13 @@ class DFSClient implements FSConstants {
             flush();
             endBlock();
 
-            blockStream.close();                
-            blockReplyStream.close();
+            backupStream.close();
+            backupFile.delete();
 
+            if (blockStreamWorking) {
+                s.close();
+                blockStreamWorking = false;
+            }
             super.close();
 
             long localstart = System.currentTimeMillis();

+ 7 - 4
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -368,8 +368,9 @@ public class DataNode implements FSConstants, Runnable {
                                     // Connect to backup machine
                                     mirrorTarget = createSocketAddr(targets[1].getName().toString());
                                     try {
-                                        Socket s2 = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort());
-                                        //s2.setSoTimeout(READ_TIMEOUT);
+                                        Socket s2 = new Socket();
+                                        s2.connect(mirrorTarget, READ_TIMEOUT);
+                                        s2.setSoTimeout(READ_TIMEOUT);
                                         out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
                                         in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
 
@@ -507,7 +508,6 @@ public class DataNode implements FSConstants, Runnable {
                             mirrors.add(curTarget);
                             LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
                             newLB.write(reply);
-                            reply.flush();
                         } finally {
                             reply.close();
                         }
@@ -638,7 +638,9 @@ public class DataNode implements FSConstants, Runnable {
         public void run() {
 	    xmitsInProgress++;
             try {
-                Socket s = new Socket(curTarget.getAddress(), curTarget.getPort());
+                Socket s = new Socket();
+                s.connect(curTarget, READ_TIMEOUT);
+                s.setSoTimeout(READ_TIMEOUT);
                 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
                 try {
                     long filelen = data.getLength(b);
@@ -673,6 +675,7 @@ public class DataNode implements FSConstants, Runnable {
                 }
                 LOG.info("Transmitted block " + b + " to " + curTarget);
             } catch (IOException ie) {
+              LOG.log(Level.WARNING, "Failed to transfer "+b+" to "+curTarget, ie);
             } finally {
 		xmitsInProgress--;
 	    }

+ 2 - 2
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -783,7 +783,7 @@ class FSNamesystem implements FSConstants {
      * 1) Record the heartbeat, so the datanode isn't timed out
      * 2) Adjust usage stats for future block allocation
      */
-    public void gotHeartbeat(UTF8 name, long capacity, long remaining) {
+    public synchronized void gotHeartbeat(UTF8 name, long capacity, long remaining) {
         synchronized (heartbeats) {
             synchronized (datanodeMap) {
                 long capacityDiff = 0;
@@ -1285,7 +1285,7 @@ class FSNamesystem implements FSConstants {
             }
         }
         Collections.shuffle(targetList);
-
+        
         //
         // Now pick one
         //

+ 3 - 1
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -184,7 +184,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
      * The client needs to give up on the block.
      */
     public void abandonBlock(Block b, String src) throws IOException {
-        namesystem.abandonBlock(b, new UTF8(src));
+        if (! namesystem.abandonBlock(b, new UTF8(src))) {
+            throw new IOException("Cannot abandon block during write to " + src);
+        }
     }
     /**
      */

+ 1 - 29
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -124,35 +124,7 @@ public class JobConf extends Configuration {
   /** Constructs a local file name.  Files are distributed among configured
    * local directories.*/
   public File getLocalFile(String subdir, String name) throws IOException {
-      String param[] = new String[1];
-      param[0] = name;
-      return getLocalFile(subdir, param, "", false);
-  }
-  // REMIND - mjc - rename this!  getLocalFile() is not quite the same.
-  public File getLocalFile(String subdir, String names[], String ending) throws IOException {
-      return getLocalFile(subdir, names, ending, true);
-  }
-  File getLocalFile(String subdir, String names[], String ending, boolean existingFileTest) throws IOException {
-    String[] localDirs = getLocalDirs();
-    for (int k = 0; k < names.length; k++) {
-        String path = subdir + File.separator + names[k] + ending;
-        int hashCode = path.hashCode();
-        for (int i = 0; i < localDirs.length; i++) {  // try each local dir
-            int index = (hashCode+i & Integer.MAX_VALUE) % localDirs.length;
-            File file = new File(localDirs[index], path);
-            File dir = file.getParentFile();
-            if (existingFileTest) {
-                if (file.exists()) {
-                    return file;
-                }
-            } else {
-                if (dir.exists() || dir.mkdirs()) {
-                    return file;
-                }
-            }
-        }
-    }
-    throw new IOException("No valid local directories.");
+    return getFile("mapred.local.dir", name + File.separator + subdir);
   }
 
   public void setInputDir(File dir) { set("mapred.input.dir", dir); }

+ 7 - 2
src/java/org/apache/hadoop/mapred/MapOutputFile.java

@@ -67,9 +67,14 @@ class MapOutputFile implements Writable, Configurable {
     throws IOException {
     return this.jobConf.getLocalFile(reduceTaskId, mapTaskId+".out");
   }
-  public File getInputFile(String mapTaskId[], String reduceTaskId)
+  public File getInputFile(String mapTaskIds[], String reduceTaskId)
     throws IOException {
-    return this.jobConf.getLocalFile(reduceTaskId, mapTaskId, ".out");
+    for (int i = 0; i < mapTaskIds.length; i++) {
+      File file = jobConf.getLocalFile(reduceTaskId, mapTaskIds[i]+".out");
+      if (file.exists())
+        return file;
+    }
+    throw new IOException("Input file not found!");
   }
 
   /** Removes all of the files related to a task. */