Browse Source

Fixing MR intermediate spills. Contributed by Arun Suresh.

(cherry picked from commit 6b710a42e00acca405e085724c89cda016cf7442)
Vinod Kumar Vavilapalli 10 years ago
parent
commit
87862970f1
16 changed files with 155 additions and 46 deletions
  1. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
  2. 9 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
  3. 18 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
  4. 23 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  5. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
  6. 2 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
  7. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
  8. 25 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
  9. 8 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
  10. 6 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
  11. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
  12. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
  13. 8 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md
  14. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
  15. 21 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
  16. 8 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java

@@ -83,6 +83,7 @@ public class LocalContainerLauncher extends AbstractService implements
   private final ClassLoader jobClassLoader;
   private final ClassLoader jobClassLoader;
   private ExecutorService taskRunner;
   private ExecutorService taskRunner;
   private Thread eventHandler;
   private Thread eventHandler;
+  private byte[] encryptedSpillKey = new byte[] {0};
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
       new LinkedBlockingQueue<ContainerLauncherEvent>();
 
 
@@ -176,6 +177,11 @@ public class LocalContainerLauncher extends AbstractService implements
     }
     }
   }
   }
 
 
+  public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
+    if (encryptedSpillKey != null) {
+      this.encryptedSpillKey = encryptedSpillKey;
+    }
+  }
 
 
   /*
   /*
    * Uber-AM lifecycle/ordering ("normal" case):
    * Uber-AM lifecycle/ordering ("normal" case):
@@ -382,6 +388,10 @@ public class LocalContainerLauncher extends AbstractService implements
         // map to handle)
         // map to handle)
         conf.setBoolean("mapreduce.task.uberized", true);
         conf.setBoolean("mapreduce.task.uberized", true);
 
 
+        // Check and handle Encrypted spill key
+        task.setEncryptedSpillKey(encryptedSpillKey);
+        YarnChild.setEncryptedSpillKeyIfRequired(task);
+
         // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
         // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
         // etc.), or just assume/hope the state machine(s) and uber-AM work
         // etc.), or just assume/hope the state machine(s) and uber-AM work
         // as expected?
         // as expected?

+ 9 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -81,17 +81,21 @@ public class TaskAttemptListenerImpl extends CompositeService
     jvmIDToActiveAttemptMap
     jvmIDToActiveAttemptMap
       = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
       = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
   private Set<WrappedJvmID> launchedJVMs = Collections
   private Set<WrappedJvmID> launchedJVMs = Collections
-      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
-  
+      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
+
   private JobTokenSecretManager jobTokenSecretManager = null;
   private JobTokenSecretManager jobTokenSecretManager = null;
-  
+
+  private byte[] encryptedSpillKey;
+
   public TaskAttemptListenerImpl(AppContext context,
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager,
       JobTokenSecretManager jobTokenSecretManager,
-      RMHeartbeatHandler rmHeartbeatHandler) {
+      RMHeartbeatHandler rmHeartbeatHandler,
+      byte[] secretShuffleKey) {
     super(TaskAttemptListenerImpl.class.getName());
     super(TaskAttemptListenerImpl.class.getName());
     this.context = context;
     this.context = context;
     this.jobTokenSecretManager = jobTokenSecretManager;
     this.jobTokenSecretManager = jobTokenSecretManager;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
+    this.encryptedSpillKey = secretShuffleKey;
   }
   }
 
 
   @Override
   @Override
@@ -439,6 +443,7 @@ public class TaskAttemptListenerImpl extends CompositeService
             jvmIDToActiveAttemptMap.remove(wJvmID);
             jvmIDToActiveAttemptMap.remove(wJvmID);
         launchedJVMs.remove(wJvmID);
         launchedJVMs.remove(wJvmID);
         LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
         LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+        task.setEncryptedSpillKey(encryptedSpillKey);
         jvmTask = new JvmTask(task, false);
         jvmTask = new JvmTask(task, false);
       }
       }
     }
     }

+ 18 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java

@@ -159,6 +159,7 @@ class YarnChild {
         @Override
         @Override
         public Object run() throws Exception {
         public Object run() throws Exception {
           // use job-specified working directory
           // use job-specified working directory
+          setEncryptedSpillKeyIfRequired(taskFinal);
           FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
           FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
           taskFinal.run(job, umbilical); // run the task
           taskFinal.run(job, umbilical); // run the task
           return null;
           return null;
@@ -217,6 +218,23 @@ class YarnChild {
     }
     }
   }
   }
 
 
+  /**
+   * Utility method to check if the Encrypted Spill Key needs to be set into the
+   * user credentials of the user running the Map / Reduce Task
+   * @param task The Map / Reduce task to set the Encrypted Spill information in
+   * @throws Exception
+   */
+  public static void setEncryptedSpillKeyIfRequired(Task task) throws
+          Exception {
+    if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
+            .getEncryptedSpillKey().length > 1)) {
+      Credentials creds =
+              UserGroupInformation.getCurrentUser().getCredentials();
+      TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
+      UserGroupInformation.getCurrentUser().addCredentials(creds);
+    }
+  }
+
   /**
   /**
    * Configure mapred-local dirs. This config is used by the task for finding
    * Configure mapred-local dirs. This config is used by the task for finding
    * out an output directory.
    * out an output directory.

+ 23 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
 import java.io.IOException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.InvocationTargetException;
+import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -47,6 +48,7 @@ import org.apache.hadoop.mapred.LocalContainerLauncher;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
 import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -146,6 +148,8 @@ import org.apache.log4j.LogManager;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 
 
+import javax.crypto.KeyGenerator;
+
 /**
 /**
  * The Map-Reduce Application Master.
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -173,6 +177,7 @@ public class MRAppMaster extends CompositeService {
    * Priority of the MRAppMaster shutdown hook.
    * Priority of the MRAppMaster shutdown hook.
    */
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+  public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";
 
 
   private Clock clock;
   private Clock clock;
   private final long startTime;
   private final long startTime;
@@ -203,6 +208,7 @@ public class MRAppMaster extends CompositeService {
   private JobEventDispatcher jobEventDispatcher;
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
   private JobHistoryEventHandler jobHistoryEventHandler;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
+  private byte[] encryptedSpillKey;
 
 
   // After a task attempt completes from TaskUmbilicalProtocol's point of view,
   // After a task attempt completes from TaskUmbilicalProtocol's point of view,
   // it will be transitioned to finishing state.
   // it will be transitioned to finishing state.
@@ -686,8 +692,22 @@ public class MRAppMaster extends CompositeService {
     try {
     try {
       this.currentUser = UserGroupInformation.getCurrentUser();
       this.currentUser = UserGroupInformation.getCurrentUser();
       this.jobCredentials = ((JobConf)conf).getCredentials();
       this.jobCredentials = ((JobConf)conf).getCredentials();
+      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
+        int keyLen = conf.getInt(
+                MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
+                MRJobConfig
+                        .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
+        KeyGenerator keyGen =
+                KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
+        keyGen.init(keyLen);
+        encryptedSpillKey = keyGen.generateKey().getEncoded();
+      } else {
+        encryptedSpillKey = new byte[] {0};
+      }
     } catch (IOException e) {
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
       throw new YarnRuntimeException(e);
+    } catch (NoSuchAlgorithmException e) {
+      throw new YarnRuntimeException(e);
     }
     }
   }
   }
 
 
@@ -743,7 +763,7 @@ public class MRAppMaster extends CompositeService {
   protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
   protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
     TaskAttemptListener lis =
     TaskAttemptListener lis =
         new TaskAttemptListenerImpl(context, jobTokenSecretManager,
         new TaskAttemptListenerImpl(context, jobTokenSecretManager,
-            getRMHeartbeatHandler());
+            getRMHeartbeatHandler(), encryptedSpillKey);
     return lis;
     return lis;
   }
   }
 
 
@@ -910,6 +930,8 @@ public class MRAppMaster extends CompositeService {
       if (job.isUber()) {
       if (job.isUber()) {
         this.containerLauncher = new LocalContainerLauncher(context,
         this.containerLauncher = new LocalContainerLauncher(context,
             (TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
             (TaskUmbilicalProtocol) taskAttemptListener, jobClassLoader);
+        ((LocalContainerLauncher) this.containerLauncher)
+                .setEncryptedSpillKey(encryptedSpillKey);
       } else {
       } else {
         this.containerLauncher = new ContainerLauncherImpl(context);
         this.containerLauncher = new ContainerLauncherImpl(context);
       }
       }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java

@@ -68,7 +68,7 @@ public class TestTaskAttemptFinishingMonitor {
     when(appCtx.getClock()).thenReturn(clock);
     when(appCtx.getClock()).thenReturn(clock);
 
 
     TaskAttemptListenerImpl listener =
     TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler);
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null);
 
 
     listener.init(conf);
     listener.init(conf);
     listener.start();
     listener.start();

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -59,14 +59,14 @@ public class TestTaskAttemptListenerImpl {
     public MockTaskAttemptListenerImpl(AppContext context,
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
         JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler) {
         RMHeartbeatHandler rmHeartbeatHandler) {
-      super(context, jobTokenSecretManager, rmHeartbeatHandler);
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
     }
     }
     
     
     public MockTaskAttemptListenerImpl(AppContext context,
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
         JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler,
         RMHeartbeatHandler rmHeartbeatHandler,
         TaskHeartbeatHandler hbHandler) {
         TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager, rmHeartbeatHandler);
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
       this.taskHeartbeatHandler = hbHandler;
       this.taskHeartbeatHandler = hbHandler;
     }
     }
     
     

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java

@@ -255,7 +255,7 @@ public class TestFail {
       //task time out is reduced
       //task time out is reduced
       //when attempt times out, heartbeat handler will send the lost event
       //when attempt times out, heartbeat handler will send the lost event
       //leading to Attempt failure
       //leading to Attempt failure
-      return new TaskAttemptListenerImpl(getContext(), null, null) {
+      return new TaskAttemptListenerImpl(getContext(), null, null, null) {
         @Override
         @Override
         public void startRpcServer(){};
         public void startRpcServer(){};
         @Override
         @Override

+ 25 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -150,6 +150,8 @@ abstract public class Task implements Writable, Configurable {
   private String user;                            // user running the job
   private String user;                            // user running the job
   private TaskAttemptID taskId;                   // unique, includes job id
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   private int partition;                          // id within job
+  private byte[] encryptedSpillKey = new byte[] {0};  // Key Used to encrypt
+  // intermediate spills
   TaskStatus taskStatus;                          // current status of the task
   TaskStatus taskStatus;                          // current status of the task
   protected JobStatus.State jobRunStateForCleanup;
   protected JobStatus.State jobRunStateForCleanup;
   protected boolean jobCleanup = false;
   protected boolean jobCleanup = false;
@@ -262,6 +264,24 @@ abstract public class Task implements Writable, Configurable {
     this.tokenSecret = tokenSecret;
     this.tokenSecret = tokenSecret;
   }
   }
 
 
+  /**
+   * Get Encrypted spill key
+   * @return encrypted spill key
+   */
+  public byte[] getEncryptedSpillKey() {
+    return encryptedSpillKey;
+  }
+
+  /**
+   * Set Encrypted spill key
+   * @param encryptedSpillKey key
+   */
+  public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
+    if (encryptedSpillKey != null) {
+      this.encryptedSpillKey = encryptedSpillKey;
+    }
+  }
+
   /**
   /**
    * Get the job token secret
    * Get the job token secret
    * @return the token secret
    * @return the token secret
@@ -492,6 +512,8 @@ abstract public class Task implements Writable, Configurable {
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(taskCleanup);
     out.writeBoolean(taskCleanup);
     Text.writeString(out, user);
     Text.writeString(out, user);
+    out.writeInt(encryptedSpillKey.length);
+    out.write(encryptedSpillKey);
     extraData.write(out);
     extraData.write(out);
   }
   }
   
   
@@ -517,6 +539,9 @@ abstract public class Task implements Writable, Configurable {
       setPhase(TaskStatus.Phase.CLEANUP);
       setPhase(TaskStatus.Phase.CLEANUP);
     }
     }
     user = StringInterner.weakIntern(Text.readString(in));
     user = StringInterner.weakIntern(Text.readString(in));
+    int len = in.readInt();
+    encryptedSpillKey = new byte[len];
+    in.readFully(encryptedSpillKey);
     extraData.readFields(in);
     extraData.readFields(in);
   }
   }
 
 

+ 8 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java

@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream;
 import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream;
 import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream;
 import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.LimitInputStream;
 import org.apache.hadoop.util.LimitInputStream;
@@ -50,7 +49,7 @@ public class CryptoUtils {
 
 
   private static final Log LOG = LogFactory.getLog(CryptoUtils.class);
   private static final Log LOG = LogFactory.getLog(CryptoUtils.class);
 
 
-  public static boolean isShuffleEncrypted(Configuration conf) {
+  public static boolean isEncryptedSpillEnabled(Configuration conf) {
     return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
     return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
         MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA);
         MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA);
   }
   }
@@ -64,7 +63,7 @@ public class CryptoUtils {
    */
    */
   public static byte[] createIV(Configuration conf) throws IOException {
   public static byte[] createIV(Configuration conf) throws IOException {
     CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
     CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
       byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
       cryptoCodec.generateSecureRandom(iv);
       cryptoCodec.generateSecureRandom(iv);
       return iv;
       return iv;
@@ -75,13 +74,13 @@ public class CryptoUtils {
 
 
   public static int cryptoPadding(Configuration conf) {
   public static int cryptoPadding(Configuration conf) {
     // Sizeof(IV) + long(start-offset)
     // Sizeof(IV) + long(start-offset)
-    return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf)
+    return isEncryptedSpillEnabled(conf) ? CryptoCodec.getInstance(conf)
         .getCipherSuite().getAlgorithmBlockSize() + 8 : 0;
         .getCipherSuite().getAlgorithmBlockSize() + 8 : 0;
   }
   }
 
 
   private static byte[] getEncryptionKey() throws IOException {
   private static byte[] getEncryptionKey() throws IOException {
-    return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser()
-        .getCredentials());
+    return TokenCache.getEncryptedSpillKey(UserGroupInformation.getCurrentUser()
+            .getCredentials());
   }
   }
 
 
   private static int getBufferSize(Configuration conf) {
   private static int getBufferSize(Configuration conf) {
@@ -102,7 +101,7 @@ public class CryptoUtils {
    */
    */
   public static FSDataOutputStream wrapIfNecessary(Configuration conf,
   public static FSDataOutputStream wrapIfNecessary(Configuration conf,
       FSDataOutputStream out) throws IOException {
       FSDataOutputStream out) throws IOException {
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
       out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
       byte[] iv = createIV(conf);
       byte[] iv = createIV(conf);
       out.write(iv);
       out.write(iv);
@@ -137,7 +136,7 @@ public class CryptoUtils {
    */
    */
   public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
   public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
       long length) throws IOException {
       long length) throws IOException {
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       int bufferSize = getBufferSize(conf);
       int bufferSize = getBufferSize(conf);
       if (length > -1) {
       if (length > -1) {
         in = new LimitInputStream(in, length);
         in = new LimitInputStream(in, length);
@@ -174,7 +173,7 @@ public class CryptoUtils {
    */
    */
   public static FSDataInputStream wrapIfNecessary(Configuration conf,
   public static FSDataInputStream wrapIfNecessary(Configuration conf,
       FSDataInputStream in) throws IOException {
       FSDataInputStream in) throws IOException {
-    if (isShuffleEncrypted(conf)) {
+    if (isEncryptedSpillEnabled(conf)) {
       CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
       CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
       int bufferSize = getBufferSize(conf);
       int bufferSize = getBufferSize(conf);
       // Not going to be used... but still has to be read...
       // Not going to be used... but still has to be read...

+ 6 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -18,12 +18,10 @@
 package org.apache.hadoop.mapreduce;
 package org.apache.hadoop.mapreduce;
 
 
 import java.io.File;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.security.NoSuchAlgorithmException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -42,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
@@ -52,7 +49,6 @@ import org.apache.hadoop.mapred.QueueACL;
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
 
 import org.apache.hadoop.mapreduce.counters.Limits;
 import org.apache.hadoop.mapreduce.counters.Limits;
-import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.TokenCache;
@@ -176,13 +172,8 @@ class JobSubmitter {
       if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
       if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
         KeyGenerator keyGen;
         KeyGenerator keyGen;
         try {
         try {
-         
-          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
-              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
-                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
-              : SHUFFLE_KEY_LENGTH;
           keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
           keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
-          keyGen.init(keyLen);
+          keyGen.init(SHUFFLE_KEY_LENGTH);
         } catch (NoSuchAlgorithmException e) {
         } catch (NoSuchAlgorithmException e) {
           throw new IOException("Error generating shuffle secret key", e);
           throw new IOException("Error generating shuffle secret key", e);
         }
         }
@@ -190,6 +181,11 @@ class JobSubmitter {
         TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
         TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
             job.getCredentials());
             job.getCredentials());
       }
       }
+      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
+        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
+        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
+                "data spill is enabled");
+      }
 
 
       copyAndConfigureFiles(job, submitJobDir);
       copyAndConfigureFiles(job, submitJobDir);
 
 

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java

@@ -176,6 +176,7 @@ public class TokenCache {
   public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
   public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
   private static final Text JOB_TOKEN = new Text("JobToken");
   private static final Text JOB_TOKEN = new Text("JobToken");
   private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
   private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
+  private static final Text ENC_SPILL_KEY = new Text("MapReduceEncryptedSpillKey");
   
   
   /**
   /**
    * load job token from a file
    * load job token from a file
@@ -244,6 +245,15 @@ public class TokenCache {
     return getSecretKey(credentials, SHUFFLE_TOKEN);
     return getSecretKey(credentials, SHUFFLE_TOKEN);
   }
   }
 
 
+  @InterfaceAudience.Private
+  public static void setEncryptedSpillKey(byte[] key, Credentials credentials) {
+    credentials.addSecretKey(ENC_SPILL_KEY, key);
+  }
+
+  @InterfaceAudience.Private
+  public static byte[] getEncryptedSpillKey(Credentials credentials) {
+    return getSecretKey(credentials, ENC_SPILL_KEY);
+  }
   /**
   /**
    * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
    * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
    * instead, this method is included for compatibility against Hadoop-1
    * instead, this method is included for compatibility against Hadoop-1

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java

@@ -127,6 +127,9 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     long compressedLength = ir.partLength;
     long compressedLength = ir.partLength;
     long decompressedLength = ir.rawLength;
     long decompressedLength = ir.rawLength;
 
 
+    compressedLength -= CryptoUtils.cryptoPadding(job);
+    decompressedLength -= CryptoUtils.cryptoPadding(job);
+
     // Get the location for the map output - either in-memory or on-disk
     // Get the location for the map output - either in-memory or on-disk
     MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
     MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
         id);
         id);
@@ -150,8 +153,7 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     inStream = CryptoUtils.wrapIfNecessary(job, inStream);
     inStream = CryptoUtils.wrapIfNecessary(job, inStream);
 
 
     try {
     try {
-      inStream.seek(ir.startOffset);
-
+      inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
       mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
       mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
     } finally {
     } finally {
       try {
       try {

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md

@@ -253,3 +253,11 @@ You can do this on a per-job basis, or by means of a cluster-wide setting in the
 To set this property in NodeManager, set it in the `yarn-env.sh` file:
 To set this property in NodeManager, set it in the `yarn-env.sh` file:
 
 
       YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all"
       YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all"
+
+Encrypted Intermediate Data Spill files
+---------------------------------------
+
+This capability allows encryption of the intermediate files generated during the merge and shuffle phases.
+It can be enabled by setting the `mapreduce.job.encrypted-intermediate-data` job property to `true`.
+
+**NOTE:** Currently, enabling encrypted intermediate data spills would restrict the number of attempts of the job to 1.

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java

@@ -87,7 +87,7 @@ public class TestMerger {
     jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-    TokenCache.setShuffleSecretKey(new byte[16], credentials);
+    TokenCache.setEncryptedSpillKey(new byte[16], credentials);
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
     testInMemoryAndOnDiskMerger();
     testInMemoryAndOnDiskMerger();
   }
   }

+ 21 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java

@@ -52,24 +52,31 @@ public class TestMRIntermediateDataEncryption {
 
 
   @Test
   @Test
   public void testSingleReducer() throws Exception {
   public void testSingleReducer() throws Exception {
-    doEncryptionTest(3, 1, 2);
+    doEncryptionTest(3, 1, 2, false);
+  }
+
+  @Test
+  public void testUberMode() throws Exception {
+    doEncryptionTest(3, 1, 2, true);
   }
   }
 
 
   @Test
   @Test
   public void testMultipleMapsPerNode() throws Exception {
   public void testMultipleMapsPerNode() throws Exception {
-    doEncryptionTest(8, 1, 2);
+    doEncryptionTest(8, 1, 2, false);
   }
   }
 
 
   @Test
   @Test
   public void testMultipleReducers() throws Exception {
   public void testMultipleReducers() throws Exception {
-    doEncryptionTest(2, 4, 2);
+    doEncryptionTest(2, 4, 2, false);
   }
   }
 
 
-  public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception {
-    doEncryptionTest(numMappers, numReducers, numNodes, 1000);
+  public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
+                               boolean isUber) throws Exception {
+    doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber);
   }
   }
 
 
-  public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception {
+  public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
+                               int numLines, boolean isUber) throws Exception {
     MiniDFSCluster dfsCluster = null;
     MiniDFSCluster dfsCluster = null;
     MiniMRClientCluster mrCluster = null;
     MiniMRClientCluster mrCluster = null;
     FileSystem fileSystem = null;
     FileSystem fileSystem = null;
@@ -85,7 +92,8 @@ public class TestMRIntermediateDataEncryption {
       // Generate input.
       // Generate input.
       createInput(fileSystem, numMappers, numLines);
       createInput(fileSystem, numMappers, numLines);
       // Run the test.
       // Run the test.
-      runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines);
+      runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem,
+              numMappers, numReducers, numLines, isUber);
     } finally {
     } finally {
       if (dfsCluster != null) {
       if (dfsCluster != null) {
         dfsCluster.shutdown();
         dfsCluster.shutdown();
@@ -111,8 +119,9 @@ public class TestMRIntermediateDataEncryption {
     }
     }
   }
   }
 
 
-  private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines)
-    throws Exception {
+  private void runMergeTest(JobConf job, FileSystem fileSystem, int
+          numMappers, int numReducers, int numLines, boolean isUber)
+          throws Exception {
     fileSystem.delete(OUTPUT, true);
     fileSystem.delete(OUTPUT, true);
     job.setJobName("Test");
     job.setJobName("Test");
     JobClient client = new JobClient(job);
     JobClient client = new JobClient(job);
@@ -133,6 +142,9 @@ public class TestMRIntermediateDataEncryption {
     job.setInt("mapreduce.map.maxattempts", 1);
     job.setInt("mapreduce.map.maxattempts", 1);
     job.setInt("mapreduce.reduce.maxattempts", 1);
     job.setInt("mapreduce.reduce.maxattempts", 1);
     job.setInt("mapred.test.num_lines", numLines);
     job.setInt("mapred.test.num_lines", numLines);
+    if (isUber) {
+      job.setBoolean("mapreduce.job.ubertask.enable", true);
+    }
     job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     try {
     try {
       submittedJob = client.submitJob(job);
       submittedJob = client.submitJob(job);

+ 8 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java

@@ -118,12 +118,14 @@ public class TestMapProgress extends TestCase {
     throws IOException, InterruptedException {
     throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");
       StringBuffer buf = new StringBuffer("Task ");
       buf.append(taskId);
       buf.append(taskId);
-      buf.append(" making progress to ");
-      buf.append(taskStatus.getProgress());
-      String state = taskStatus.getStateString();
-      if (state != null) {
-        buf.append(" and state of ");
-        buf.append(state);
+      if (taskStatus != null) {
+        buf.append(" making progress to ");
+        buf.append(taskStatus.getProgress());
+        String state = taskStatus.getStateString();
+        if (state != null) {
+          buf.append(" and state of ");
+          buf.append(state);
+        }
       }
       }
       LOG.info(buf.toString());
       LOG.info(buf.toString());
       // ignore phase
       // ignore phase