ソースを参照

HADOOP-552. Improved error checking when copying map output files to reduce nodes.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@452156 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年 前
コミット
7f87a01387
2 ファイル変更39 行追加14 行削除
  1. 3 0
      CHANGES.txt
  2. 36 14
      src/java/org/apache/hadoop/mapred/MapOutputLocation.java

+ 3 - 0
CHANGES.txt

@@ -100,6 +100,9 @@ Trunk (unreleased changes)
     IllegalStateException's were logged, sets content-length
     correctly, and better handles some errors.  (omalley via cutting)
 
+25. HADOOP-552.  Improved error checking when copying map output files
+    to reduce nodes.  (omalley via cutting)
+
 
 Release 0.6.2 - 2006-09-18
 

+ 36 - 14
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 
 import java.io.*;
-import java.net.URL;
+import java.net.*;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
@@ -104,24 +104,46 @@ class MapOutputLocation implements Writable {
                       Path localFilename, 
                       int reduce,
                       Progressable pingee) throws IOException {
-    URL path = new URL(toString() + "&reduce=" + reduce);
-    InputStream input = path.openConnection().getInputStream();
-    OutputStream output = fileSys.create(localFilename);
+    boolean good = false;
     long totalBytes = 0;
+    URL path = new URL(toString() + "&reduce=" + reduce);
     try {
-      byte[] buffer = new byte[64 * 1024];
-      int len = input.read(buffer);
-      while (len > 0) {
-        totalBytes += len;
-        output.write(buffer, 0 ,len);
-        if (pingee != null) {
-          pingee.progress();
+      URLConnection connection = path.openConnection();
+      InputStream input = connection.getInputStream();
+      try {
+        OutputStream output = fileSys.create(localFilename);
+        try {
+          byte[] buffer = new byte[64 * 1024];
+          int len = input.read(buffer);
+          while (len > 0) {
+            totalBytes += len;
+            output.write(buffer, 0 ,len);
+            if (pingee != null) {
+              pingee.progress();
+            }
+            len = input.read(buffer);
+          }
+        } finally {
+          output.close();
         }
-        len = input.read(buffer);
+      } finally {
+        input.close();
+      }
+      good = ((int) totalBytes) == connection.getContentLength();
+      if (!good) {
+        throw new IOException("Incomplete map output received for " + path +
+                              " (" + totalBytes + " instead of " + 
+                              connection.getContentLength() + ")");
       }
     } finally {
-      input.close();
-      output.close();
+      if (!good) {
+        try {
+          fileSys.delete(localFilename);
+          totalBytes = 0;
+        } catch (Throwable th) {
+          // IGNORED because we are cleaning up
+        }
+      }
     }
     return totalBytes;
   }