|
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.audit.AuditConstants;
|
|
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
|
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
|
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
|
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
|
|
+import org.apache.hadoop.mapreduce.JobID;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
|
|
|
@@ -49,12 +50,17 @@ public final class AuditContextUpdater {
|
|
* @param jobContext job/task context.
|
|
* @param jobContext job/task context.
|
|
*/
|
|
*/
|
|
public AuditContextUpdater(final JobContext jobContext) {
|
|
public AuditContextUpdater(final JobContext jobContext) {
|
|
- this.jobId = jobContext.getJobID().toString();
|
|
|
|
|
|
+ JobID contextJobID = jobContext.getJobID();
|
|
|
|
+ this.jobId = contextJobID != null
|
|
|
|
+ ? contextJobID.toString()
|
|
|
|
+ : null;
|
|
|
|
|
|
if (jobContext instanceof TaskAttemptContext) {
|
|
if (jobContext instanceof TaskAttemptContext) {
|
|
// it's a task, extract info for auditing
|
|
// it's a task, extract info for auditing
|
|
final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
|
|
final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
|
|
- this.taskAttemptId = tid.toString();
|
|
|
|
|
|
+ this.taskAttemptId = tid != null
|
|
|
|
+ ? tid.toString()
|
|
|
|
+ : null;
|
|
} else {
|
|
} else {
|
|
this.taskAttemptId = null;
|
|
this.taskAttemptId = null;
|
|
}
|
|
}
|
|
@@ -70,7 +76,11 @@ public final class AuditContextUpdater {
|
|
*/
|
|
*/
|
|
public void updateCurrentAuditContext() {
|
|
public void updateCurrentAuditContext() {
|
|
final CommonAuditContext auditCtx = currentAuditContext();
|
|
final CommonAuditContext auditCtx = currentAuditContext();
|
|
- auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
|
|
|
|
|
|
+ if (jobId != null) {
|
|
|
|
+ auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
|
|
|
|
+ } else {
|
|
|
|
+ currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
|
|
|
|
+ }
|
|
if (taskAttemptId != null) {
|
|
if (taskAttemptId != null) {
|
|
auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
|
|
auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
|
|
} else {
|
|
} else {
|