Forráskód Böngészése

HADOOP-18402. S3A committer NPE in spark job abort (#4735)

jobId.toString() to only be called when the ID isn't null.

this doesn't surface in MR, but spark seems to manage it

Change-Id: I06692ef30a4af510c660d7222292932a8d4b5147
Steve Loughran 2 éve
szülő
commit
ad83e95046

+ 13 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.audit.AuditConstants;
 import org.apache.hadoop.fs.audit.CommonAuditContext;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
@@ -49,12 +50,17 @@ public final class AuditContextUpdater {
    * @param jobContext job/task context.
    */
   public AuditContextUpdater(final JobContext jobContext) {
-    this.jobId = jobContext.getJobID().toString();
+    JobID contextJobID = jobContext.getJobID();
+    this.jobId = contextJobID != null
+            ? contextJobID.toString()
+            : null;
 
     if (jobContext instanceof TaskAttemptContext) {
       // it's a task, extract info for auditing
       final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
-      this.taskAttemptId = tid.toString();
+      this.taskAttemptId = tid != null
+          ? tid.toString()
+          : null;
     } else {
       this.taskAttemptId = null;
     }
@@ -70,7 +76,11 @@ public final class AuditContextUpdater {
    */
   public void updateCurrentAuditContext() {
     final CommonAuditContext auditCtx = currentAuditContext();
-    auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+    if (jobId != null) {
+      auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
+    } else {
+      currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
+    }
     if (taskAttemptId != null) {
       auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
     } else {

+ 7 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.JsonSerialization;
 import org.apache.hadoop.util.Preconditions;
@@ -156,7 +157,12 @@ public final class CommitContext implements Closeable {
     this.commitOperations = commitOperations;
     this.jobContext = jobContext;
     this.conf = jobContext.getConfiguration();
-    this.jobId = jobContext.getJobID().toString();
+    JobID contextJobID = jobContext.getJobID();
+    // either the job ID or make one up as it will be
+    // used for the filename of any reports.
+    this.jobId = contextJobID != null
+        ? contextJobID.toString()
+        : ("job-without-id-at-" + System.currentTimeMillis());
     this.collectIOStatistics = conf.getBoolean(
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
         S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);