|
@@ -0,0 +1,312 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.mapred;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.*;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Persists and retrieves the Job info of a job into/from DFS.
|
|
|
+ * <p/>
|
|
|
+ * If the retain time is zero jobs are not persisted.
|
|
|
+ * <p/>
|
|
|
+ * A daemon thread cleans up job info files older than the retain time
|
|
|
+ * <p/>
|
|
|
+ * The retain time can be set with the 'persist.jobstatus.hours'
|
|
|
+ * configuration variable (it is in hours).
|
|
|
+ */
|
|
|
+public class CompletedJobStatusStore implements Runnable {
|
|
|
+ private boolean active;
|
|
|
+ private String jobInfoDir;
|
|
|
+ private long retainTime;
|
|
|
+ private FileSystem fs;
|
|
|
+ private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
|
|
|
+
|
|
|
+ public static final Log LOG =
|
|
|
+ LogFactory.getLog(CompletedJobStatusStore.class);
|
|
|
+
|
|
|
+ private static long HOUR = 1000 * 60 * 60;
|
|
|
+ private static long SLEEP_TIME = 1 * HOUR;
|
|
|
+
|
|
|
+ CompletedJobStatusStore(Configuration conf) throws IOException {
|
|
|
+ active =
|
|
|
+ conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
|
|
|
+
|
|
|
+ if (active) {
|
|
|
+ fs = FileSystem.get(conf);
|
|
|
+ retainTime =
|
|
|
+ conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR;
|
|
|
+
|
|
|
+ jobInfoDir =
|
|
|
+ conf.get("mapred.job.tracker.persist.jobstatus.dir", JOB_INFO_STORE_DIR);
|
|
|
+
|
|
|
+ Path path = new Path(jobInfoDir);
|
|
|
+ if (!fs.exists(path)) {
|
|
|
+ fs.mkdirs(path);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (retainTime == 0) {
|
|
|
+ // as retain time is zero, all stored jobstatuses are deleted.
|
|
|
+ deleteJobStatusDirs();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Indicates if job status persistency is active or not.
|
|
|
+ *
|
|
|
+ * @return TRUE if active, FALSE otherwise.
|
|
|
+ */
|
|
|
+ public boolean isActive() {
|
|
|
+ return active;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ if (retainTime > 0) {
|
|
|
+ while (true) {
|
|
|
+ deleteJobStatusDirs();
|
|
|
+ try {
|
|
|
+ Thread.sleep(SLEEP_TIME);
|
|
|
+ }
|
|
|
+ catch (InterruptedException ex) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void deleteJobStatusDirs() {
|
|
|
+ try {
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
+ Path[] jobInfoFiles = fs.listPaths(
|
|
|
+ new Path[]{new Path(jobInfoDir)});
|
|
|
+
|
|
|
+ //noinspection ForLoopReplaceableByForEach
|
|
|
+ for (Path jobInfo : jobInfoFiles) {
|
|
|
+ try {
|
|
|
+ FileStatus status = fs.getFileStatus(jobInfo);
|
|
|
+ if ((currentTime - status.getModificationTime()) > retainTime) {
|
|
|
+ fs.delete(jobInfo);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IOException ie) {
|
|
|
+ LOG.warn("Could not do housekeeping for [ " +
|
|
|
+ jobInfo + "] job info : " + ie.getMessage(), ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IOException ie) {
|
|
|
+ LOG.warn("Could not obtain job info files : " + ie.getMessage(), ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Path getInfoFilePath(String jobId) {
|
|
|
+ return new Path(jobInfoDir, jobId + ".info");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Persists a job in DFS.
|
|
|
+ *
|
|
|
+ * @param job the job about to be 'retired'
|
|
|
+ */
|
|
|
+ public void store(JobInProgress job) {
|
|
|
+ if (active && retainTime > 0) {
|
|
|
+ String jobId = job.getStatus().getJobId();
|
|
|
+ Path jobStatusFile = getInfoFilePath(jobId);
|
|
|
+ try {
|
|
|
+ FSDataOutputStream dataOut = fs.create(jobStatusFile);
|
|
|
+
|
|
|
+ job.getStatus().write(dataOut);
|
|
|
+
|
|
|
+ job.getProfile().write(dataOut);
|
|
|
+
|
|
|
+ job.getCounters().write(dataOut);
|
|
|
+
|
|
|
+ TaskCompletionEvent[] events =
|
|
|
+ job.getTaskCompletionEvents(0, Integer.MAX_VALUE);
|
|
|
+ dataOut.writeInt(events.length);
|
|
|
+ for (TaskCompletionEvent event : events) {
|
|
|
+ event.write(dataOut);
|
|
|
+ }
|
|
|
+
|
|
|
+ dataOut.close();
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.warn("Could not store [" + jobId + "] job info : " +
|
|
|
+ ex.getMessage(), ex);
|
|
|
+ try {
|
|
|
+ fs.delete(jobStatusFile);
|
|
|
+ }
|
|
|
+ catch (IOException ex1) {
|
|
|
+ //ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private FSDataInputStream getJobInfoFile(String jobId) throws IOException {
|
|
|
+ Path jobStatusFile = getInfoFilePath(jobId);
|
|
|
+ return (fs.exists(jobStatusFile)) ? fs.open(jobStatusFile) : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private JobStatus readJobStatus(FSDataInputStream dataIn) throws IOException {
|
|
|
+ JobStatus jobStatus = new JobStatus();
|
|
|
+ jobStatus.readFields(dataIn);
|
|
|
+ return jobStatus;
|
|
|
+ }
|
|
|
+
|
|
|
+ private JobProfile readJobProfile(FSDataInputStream dataIn)
|
|
|
+ throws IOException {
|
|
|
+ JobProfile jobProfile = new JobProfile();
|
|
|
+ jobProfile.readFields(dataIn);
|
|
|
+ return jobProfile;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Counters readCounters(FSDataInputStream dataIn) throws IOException {
|
|
|
+ Counters counters = new Counters();
|
|
|
+ counters.readFields(dataIn);
|
|
|
+ return counters;
|
|
|
+ }
|
|
|
+
|
|
|
+ private TaskCompletionEvent[] readEvents(FSDataInputStream dataIn,
|
|
|
+ int offset, int len)
|
|
|
+ throws IOException {
|
|
|
+ int size = dataIn.readInt();
|
|
|
+ if (offset > size) {
|
|
|
+ return TaskCompletionEvent.EMPTY_ARRAY;
|
|
|
+ }
|
|
|
+ if (offset + len > size) {
|
|
|
+ len = size - offset;
|
|
|
+ }
|
|
|
+ TaskCompletionEvent[] events = new TaskCompletionEvent[len];
|
|
|
+ for (int i = 0; i < (offset + len); i++) {
|
|
|
+ TaskCompletionEvent event = new TaskCompletionEvent();
|
|
|
+ event.readFields(dataIn);
|
|
|
+ if (i >= offset) {
|
|
|
+ events[i - offset] = event;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return events;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method retrieves JobStatus information from DFS stored using
|
|
|
+ * store method.
|
|
|
+ *
|
|
|
+ * @param jobId the jobId for which jobStatus is queried
|
|
|
+ * @return JobStatus object, null if not able to retrieve
|
|
|
+ */
|
|
|
+ public JobStatus readJobStatus(String jobId) {
|
|
|
+ JobStatus jobStatus = null;
|
|
|
+ if (active) {
|
|
|
+ try {
|
|
|
+ FSDataInputStream dataIn = getJobInfoFile(jobId);
|
|
|
+ if (dataIn != null) {
|
|
|
+ jobStatus = readJobStatus(dataIn);
|
|
|
+ dataIn.close();
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.warn("Could not read [" + jobId + "] job status : " + ex, ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return jobStatus;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method retrieves JobProfile information from DFS stored using
|
|
|
+ * store method.
|
|
|
+ *
|
|
|
+ * @param jobId the jobId for which jobProfile is queried
|
|
|
+ * @return JobProfile object, null if not able to retrieve
|
|
|
+ */
|
|
|
+ public JobProfile readJobProfile(String jobId) {
|
|
|
+ JobProfile jobProfile = null;
|
|
|
+ if (active) {
|
|
|
+ try {
|
|
|
+ FSDataInputStream dataIn = getJobInfoFile(jobId);
|
|
|
+ if (dataIn != null) {
|
|
|
+ readJobStatus(dataIn);
|
|
|
+ jobProfile = readJobProfile(dataIn);
|
|
|
+ dataIn.close();
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.warn("Could not read [" + jobId + "] job profile : " + ex, ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return jobProfile;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method retrieves Counters information from DFS stored using
|
|
|
+ * store method.
|
|
|
+ *
|
|
|
+ * @param jobId the jobId for which Counters is queried
|
|
|
+ * @return Counters object, null if not able to retrieve
|
|
|
+ */
|
|
|
+ public Counters readCounters(String jobId) {
|
|
|
+ Counters counters = null;
|
|
|
+ if (active) {
|
|
|
+ try {
|
|
|
+ FSDataInputStream dataIn = getJobInfoFile(jobId);
|
|
|
+ if (dataIn != null) {
|
|
|
+ readJobStatus(dataIn);
|
|
|
+ readJobProfile(dataIn);
|
|
|
+ counters = readCounters(dataIn);
|
|
|
+ dataIn.close();
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return counters;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method retrieves TaskCompletionEvents information from DFS stored
|
|
|
+ * using store method.
|
|
|
+ *
|
|
|
+ * @param jobId the jobId for which TaskCompletionEvents is queried
|
|
|
+ * @param fromEventId events offset
|
|
|
+ * @param maxEvents max number of events
|
|
|
+ * @return TaskCompletionEvent[], empty array if not able to retrieve
|
|
|
+ */
|
|
|
+ public TaskCompletionEvent[] readJobTaskCompletionEvents(String jobId,
|
|
|
+ int fromEventId,
|
|
|
+ int maxEvents) {
|
|
|
+ TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
|
|
|
+ if (active) {
|
|
|
+ try {
|
|
|
+ FSDataInputStream dataIn = getJobInfoFile(jobId);
|
|
|
+ if (dataIn != null) {
|
|
|
+ readJobStatus(dataIn);
|
|
|
+ readJobProfile(dataIn);
|
|
|
+ readCounters(dataIn);
|
|
|
+ events = readEvents(dataIn, fromEventId, maxEvents);
|
|
|
+ dataIn.close();
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.warn("Could not read [" + jobId + "] job events : " + ex, ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return events;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|