|
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.hs;
|
|
|
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.ConnectException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
@@ -69,6 +70,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
+import org.apache.hadoop.yarn.util.Clock;
|
|
|
+import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
|
|
|
/**
|
|
|
* This class provides a way to interact with history files in a thread safe
|
|
@@ -464,7 +467,8 @@ public class HistoryFileManager extends AbstractService {
|
|
|
|
|
|
private JobACLsManager aclsMgr;
|
|
|
|
|
|
- private Configuration conf;
|
|
|
+ @VisibleForTesting
|
|
|
+ Configuration conf;
|
|
|
|
|
|
private String serialNumberFormat;
|
|
|
|
|
@@ -491,36 +495,10 @@ public class HistoryFileManager extends AbstractService {
|
|
|
+ (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
|
|
|
+ "d");
|
|
|
|
|
|
- String doneDirPrefix = null;
|
|
|
- doneDirPrefix = JobHistoryUtils
|
|
|
- .getConfiguredHistoryServerDoneDirPrefix(conf);
|
|
|
- try {
|
|
|
- doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
|
|
|
- new Path(doneDirPrefix));
|
|
|
- doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
|
|
|
- doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
|
|
|
- mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
|
|
|
- JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
|
|
- } catch (IOException e) {
|
|
|
- throw new YarnRuntimeException("Error creating done directory: ["
|
|
|
- + doneDirPrefixPath + "]", e);
|
|
|
- }
|
|
|
-
|
|
|
- String intermediateDoneDirPrefix = null;
|
|
|
- intermediateDoneDirPrefix = JobHistoryUtils
|
|
|
- .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
|
|
|
- try {
|
|
|
- intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
|
|
|
- new Path(intermediateDoneDirPrefix));
|
|
|
- intermediateDoneDirFc = FileContext.getFileContext(
|
|
|
- intermediateDoneDirPath.toUri(), conf);
|
|
|
- mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
|
|
|
- JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.info("error creating done directory on dfs " + e);
|
|
|
- throw new YarnRuntimeException("Error creating intermediate done directory: ["
|
|
|
- + intermediateDoneDirPath + "]", e);
|
|
|
- }
|
|
|
+ long maxFSWaitTime = conf.getLong(
|
|
|
+ JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
|
|
|
+ JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
|
|
|
+ createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime);
|
|
|
|
|
|
this.aclsMgr = new JobACLsManager(conf);
|
|
|
|
|
@@ -544,6 +522,107 @@ public class HistoryFileManager extends AbstractService {
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ void createHistoryDirs(Clock clock, long intervalCheckMillis,
|
|
|
+ long timeOutMillis) throws IOException {
|
|
|
+ long start = clock.getTime();
|
|
|
+ boolean done = false;
|
|
|
+ int counter = 0;
|
|
|
+ while (!done &&
|
|
|
+ ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
|
|
|
+ done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
|
|
|
+ try {
|
|
|
+ Thread.sleep(intervalCheckMillis);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ throw new YarnRuntimeException(ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!done) {
|
|
|
+ throw new YarnRuntimeException("Timed out '" + timeOutMillis+
|
|
|
+ "ms' waiting for FileSystem to become available");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * DistributedFileSystem returns a RemoteException with a message stating
|
|
|
+ * SafeModeException in it. So this is only way to check it is because of
|
|
|
+ * being in safe mode.
|
|
|
+ */
|
|
|
+ private boolean isBecauseSafeMode(Throwable ex) {
|
|
|
+ return ex.toString().contains("SafeModeException");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns TRUE if the history dirs were created, FALSE if they could not
|
|
|
+ * be created because the FileSystem is not reachable or in safe mode and
|
|
|
+ * throws and exception otherwise.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
|
|
|
+ boolean succeeded = true;
|
|
|
+ String doneDirPrefix = JobHistoryUtils.
|
|
|
+ getConfiguredHistoryServerDoneDirPrefix(conf);
|
|
|
+ try {
|
|
|
+ doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
|
|
|
+ new Path(doneDirPrefix));
|
|
|
+ doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
|
|
|
+ doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
|
|
|
+ mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
|
|
|
+ JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
|
|
|
+ } catch (ConnectException ex) {
|
|
|
+ if (logWait) {
|
|
|
+ LOG.info("Waiting for FileSystem at " +
|
|
|
+ doneDirPrefixPath.toUri().getAuthority() + "to be available");
|
|
|
+ }
|
|
|
+ succeeded = false;
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (isBecauseSafeMode(e)) {
|
|
|
+ succeeded = false;
|
|
|
+ if (logWait) {
|
|
|
+ LOG.info("Waiting for FileSystem at " +
|
|
|
+ doneDirPrefixPath.toUri().getAuthority() +
|
|
|
+ "to be out of safe mode");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new YarnRuntimeException("Error creating done directory: ["
|
|
|
+ + doneDirPrefixPath + "]", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (succeeded) {
|
|
|
+ String intermediateDoneDirPrefix = JobHistoryUtils.
|
|
|
+ getConfiguredHistoryIntermediateDoneDirPrefix(conf);
|
|
|
+ try {
|
|
|
+ intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
|
|
|
+ new Path(intermediateDoneDirPrefix));
|
|
|
+ intermediateDoneDirFc = FileContext.getFileContext(
|
|
|
+ intermediateDoneDirPath.toUri(), conf);
|
|
|
+ mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
|
|
|
+ JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
|
|
|
+ } catch (ConnectException ex) {
|
|
|
+ succeeded = false;
|
|
|
+ if (logWait) {
|
|
|
+ LOG.info("Waiting for FileSystem at " +
|
|
|
+ intermediateDoneDirPath.toUri().getAuthority() +
|
|
|
+ "to be available");
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (isBecauseSafeMode(e)) {
|
|
|
+ succeeded = false;
|
|
|
+ if (logWait) {
|
|
|
+ LOG.info("Waiting for FileSystem at " +
|
|
|
+ intermediateDoneDirPath.toUri().getAuthority() +
|
|
|
+ "to be out of safe mode");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new YarnRuntimeException(
|
|
|
+ "Error creating intermediate done directory: ["
|
|
|
+ + intermediateDoneDirPath + "]", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return succeeded;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void serviceStop() throws Exception {
|
|
|
ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
|