Browse Source

Fix for HADOOP-107. As they were written, dfs blocks were both trickled to a datanode and tee'd to a temp file (in case the connection to the datanode failed). Now they're only written to the temp file, with no connection to the datanode made until the block is complete. This reduces the number of long-lived mostly-idle connections to datanodes, which was causing problems. It also simplifies the DFSClient code significantly.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@390262 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 năm trước cách đây
mục cha
commit
03c953c0dc
1 tập tin đã thay đổi với 22 bổ sung60 xóa
  1. 22 60
      src/java/org/apache/hadoop/dfs/DFSClient.java

+ 22 - 60
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -526,9 +526,8 @@ class DFSClient implements FSConstants {
         private int pos = 0;
 
         private UTF8 src;
-        boolean closingDown = false;
         private boolean overwrite;
-        private boolean blockStreamWorking;
+        private boolean firstTime = true;
         private DataOutputStream blockStream;
         private DataInputStream blockReplyStream;
         private File backupFile;
@@ -543,13 +542,8 @@ class DFSClient implements FSConstants {
         public DFSOutputStream(UTF8 src, boolean overwrite) throws IOException {
             this.src = src;
             this.overwrite = overwrite;
-            this.blockStream = null;
-            this.blockReplyStream = null;
-            this.blockStreamWorking = false;
             this.backupFile = newBackupFile();
-
-            this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
-            nextBlockOutputStream(true);
+            this.backupStream = new FileOutputStream(backupFile);
         }
 
         private File newBackupFile() throws IOException {
@@ -565,13 +559,7 @@ class DFSClient implements FSConstants {
          * This happens when a file is created and each time a new block is allocated.
          * Must get block ID and the IDs of the destinations from the namenode.
          */
-        private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
-            if (! firstTime && blockStreamWorking) {
-                blockStream.flush();
-                s.close();
-                blockStreamWorking = false;
-            }
-
+        private synchronized void nextBlockOutputStream() throws IOException {
             boolean retry = false;
             long start = System.currentTimeMillis();
             do {
@@ -644,8 +632,8 @@ class DFSClient implements FSConstants {
                 bytesWrittenToBlock = 0;
                 blockStream = out;
                 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-                blockStreamWorking = true;
             } while (retry);
+            firstTime = false;
         }
 
         /**
@@ -708,7 +696,6 @@ class DFSClient implements FSConstants {
             }
             if (bytesWrittenToBlock == BLOCK_SIZE) {
                 endBlock();
-                nextBlockOutputStream(false);
             }
             flushData(pos);
         }
@@ -720,19 +707,7 @@ class DFSClient implements FSConstants {
         private synchronized void flushData(int maxPos) throws IOException {
             int workingPos = Math.min(pos, maxPos);
             
-            if (workingPos > 0 || 
-                (workingPos == 0 && closingDown)) {
-                //
-                // To the blockStream, write length, then bytes
-                //
-                if (blockStreamWorking) {
-                    try {
-                        blockStream.writeLong(workingPos);
-                        blockStream.write(outBuf, 0, workingPos);
-                    } catch (IOException ie) {
-                        handleSocketException(ie);
-                    }
-                }
+            if (workingPos > 0) {
                 //
                 // To the local block backup, write just the bytes
                 //
@@ -751,43 +726,27 @@ class DFSClient implements FSConstants {
          * We're done writing to the current block.
          */
         private synchronized void endBlock() throws IOException {
-            boolean mustRecover = ! blockStreamWorking;
-
-            //
-            // A zero-length set of data indicates the end of the block
-            //
-            if (blockStreamWorking) {
-                try {
-                    internalClose();
-                } catch (IOException ie) {
-                    handleSocketException(ie);
-                    mustRecover = true;
-                } finally {
-                    blockStreamWorking = false;
-                }
-            }
-
             //
             // Done with local copy
             //
             backupStream.close();
 
             //
-            // If necessary, recover from a failed datanode connection.
+            // Send it to datanode
             //
+            boolean mustRecover = true;
             while (mustRecover) {
-                nextBlockOutputStream(false);
+                nextBlockOutputStream();
                 InputStream in = new FileInputStream(backupFile);
                 try {
                     byte buf[] = new byte[BUFFER_SIZE];
                     int bytesRead = in.read(buf);
-                    while (bytesRead >= 0) {
+                    while (bytesRead > 0) {
                         blockStream.writeLong((long) bytesRead);
                         blockStream.write(buf, 0, bytesRead);
                         bytesRead = in.read(buf);
                     }
                     internalClose();
-                    LOG.info("Recovered from failed datanode connection");
                     mustRecover = false;
                 } catch (IOException ie) {
                     handleSocketException(ie);
@@ -801,12 +760,12 @@ class DFSClient implements FSConstants {
             //
             backupFile.delete();
             backupFile = newBackupFile();
-            backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
+            backupStream = new FileOutputStream(backupFile);
+            bytesWrittenToBlock = 0;
         }
 
         /**
-         * Close down stream to remote datanode.  Called from two places
-         * in endBlock();
+         * Close down stream to remote datanode.
          */
         private synchronized void internalClose() throws IOException {
             blockStream.writeLong(0);
@@ -823,16 +782,19 @@ class DFSClient implements FSConstants {
             namenode.reportWrittenBlock(lb);
 
             s.close();
+            s = null;
         }
 
         private void handleSocketException(IOException ie) throws IOException {
           LOG.log(Level.WARNING, "Error while writing.", ie);
           try {
-            s.close();
+            if (s != null) {
+              s.close();
+              s = null;
+            }
           } catch (IOException ie2) {
             LOG.log(Level.WARNING, "Error closing socket.", ie2);
           }
-          blockStreamWorking = false;
           namenode.abandonBlock(block, src.toString());
         }
 
@@ -845,16 +807,17 @@ class DFSClient implements FSConstants {
                 throw new IOException("Stream closed");
             }
 
-            closingDown = true;
             flush();
-            endBlock();
+            if (filePos == 0 || bytesWrittenToBlock != 0) {
+              endBlock();
+            }
 
             backupStream.close();
             backupFile.delete();
 
-            if (blockStreamWorking) {
+            if (s != null) {
                 s.close();
-                blockStreamWorking = false;
+                s = null;
             }
             super.close();
 
@@ -873,7 +836,6 @@ class DFSClient implements FSConstants {
                 }
             }
             closed = true;
-            closingDown = false;
         }
     }
 }