Browse Source

HADOOP-4097. Make hive work well with speculative execution turned on.
(Joydeep Sen Sarma via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@693524 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 năm trước cách đây
mục cha
commit
b1f2366ae3

+ 3 - 0
CHANGES.txt

@@ -501,6 +501,9 @@ Trunk (unreleased changes)
     implementations and moves it to the JobTracker. 
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-4097. Make hive work well with speculative execution turned on.
+    (Joydeep Sen Sarma via dhruba)
+
 Release 0.18.1 - Unreleased
 
   BUG FIXES

+ 1 - 1
src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java

@@ -81,7 +81,7 @@ public class FileSinkOperator extends TerminalOperator <fileSinkDesc> implements
     try {
       fs = FileSystem.get(hconf);
       finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf));
-      outPath = new Path(conf.getDirName(), "tmp."+Utilities.getTaskId(hconf));
+      outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf));
       OutputFormat outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
 
       if(outputFormat instanceof IgnoreKeyTextOutputFormat) {

+ 19 - 0
src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java

@@ -19,13 +19,17 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.io.IOException;
+
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.plan.loadFileDesc;
 import org.apache.hadoop.hive.ql.plan.loadTableDesc;
 import org.apache.hadoop.hive.ql.plan.moveWork;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -35,6 +39,20 @@ public class MoveTask extends Task<moveWork> implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
+  private void cleanseSource(FileSystem fs, Path sourcePath) throws IOException {
+    if(sourcePath == null)
+      return;
+
+    FileStatus [] srcs = fs.globStatus(sourcePath);
+    if(srcs != null) {
+      for(FileStatus one: srcs) {
+        if(Hive.needsDeletion(one)) {
+          fs.delete(one.getPath(), true);
+        }
+      }
+    }
+  }
+
   public int execute() {
 
     try {
@@ -46,6 +64,7 @@ public class MoveTask extends Task<moveWork> implements Serializable {
       for(loadFileDesc lfd: work.getLoadFileWork()) {
         Path targetPath = new Path(lfd.getTargetDir());
         Path sourcePath = new Path(lfd.getSourceDir());
+        cleanseSource(fs, sourcePath);
         if (lfd.getIsDfsDir()) {
           // Just do a rename on the URIs
           String mesg = "Moving data to: " + lfd.getTargetDir();

+ 17 - 9
src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java

@@ -502,17 +502,28 @@ public class Hive {
     }
   }
 
+  public static boolean needsDeletion(FileStatus file) {
+    String name = file.getPath().getName();
+    // There is a race condition in hadoop as a result of which
+    // the _task files created in the output directory at the time
+    // of the mapper is reported in the output directory even though
+    // it is actually removed. The first check works around that
+    // NOTE: it's not clear that this still affects recent versions of hadoop
+
+    // the second check deals with uncommitted output files produced by hive tasks
+    // this would typically happen on task failures or due to speculation
+    return (name.startsWith("_task") || name.startsWith("_tmp."));
+  }
+
   private void checkPaths(FileSystem fs, FileStatus [] srcs, Path destf, boolean replace) throws HiveException {
     try {
         for(int i=0; i<srcs.length; i++) {
             FileStatus [] items = fs.listStatus(srcs[i].getPath());
             for(int j=0; j<items.length; j++) {
-                // There is a race condition in hadoop as a result of which
-                // the _task files created in the output directory at the time
-                // of the mapper is reported in the output directory even though
-                // it is actually removed. The following check works around that
-                if (items[j].getPath().getName().startsWith("_task")) {
-                    continue;
+
+                if (needsDeletion(items[j])) {
+                      fs.delete(items[j].getPath(), true);
+                      continue;
                 }
                 if(items[j].isDir()) {
                     throw new HiveException("checkPaths: "+srcs[i].toString()+" has nested directory"+
@@ -584,9 +595,6 @@ public class Hive {
           for(int i=0; i<srcs.length; i++) {
               FileStatus[] items = fs.listStatus(srcs[i].getPath());
               for(int j=0; j<items.length; j++) {
-                  if (items[j].getPath().getName().startsWith("_task")) {
-                      continue;
-                  }
                   boolean b = fs.rename(items[j].getPath(), new Path(tmppath, items[j].getPath().getName()));
                   LOG.debug("Renaming:"+items[j]+",Status:"+b);
               }

+ 1 - 1
src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java

@@ -65,7 +65,7 @@ public class MetadataTypedSerDeField implements SerDeField {
 
     ColumnSet temp = (ColumnSet)obj;
     if(temp.col.size() <= _position) {
-      System.err.println("get " + temp.col.size() + "<=" + _position);
+      //System.err.println("get " + temp.col.size() + "<=" + _position);
       return null;
     }
     try {