|
@@ -26,7 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.RemoteIterator;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.service.CompositeService;
|
|
|
+import org.apache.hadoop.service.ServiceOperations;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
@@ -55,6 +56,7 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.UndeclaredThrowableException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
@@ -71,12 +73,13 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
/**
|
|
|
* Plugin timeline storage to support timeline server v1.5 API. This storage
|
|
|
* uses a file system to store timeline entities in their groups.
|
|
|
*/
|
|
|
-public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
+public class EntityGroupFSTimelineStore extends CompositeService
|
|
|
implements TimelineStore {
|
|
|
|
|
|
static final String DOMAIN_LOG_PREFIX = "domainlog-";
|
|
@@ -110,6 +113,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
|
|
|
new ConcurrentHashMap<ApplicationId, AppLogs>();
|
|
|
private ScheduledThreadPoolExecutor executor;
|
|
|
+ private AtomicBoolean stopExecutors = new AtomicBoolean(false);
|
|
|
private FileSystem fs;
|
|
|
private ObjectMapper objMapper;
|
|
|
private JsonFactory jsonFactory;
|
|
@@ -128,7 +132,8 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
@Override
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
|
summaryStore = createSummaryStore();
|
|
|
- summaryStore.init(conf);
|
|
|
+ addService(summaryStore);
|
|
|
+
|
|
|
long logRetainSecs = conf.getLong(
|
|
|
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
|
|
|
YarnConfiguration
|
|
@@ -170,17 +175,28 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
});
|
|
|
cacheIdPlugins = loadPlugIns(conf);
|
|
|
// Initialize yarn client for application status
|
|
|
- yarnClient = YarnClient.createYarnClient();
|
|
|
- yarnClient.init(conf);
|
|
|
+ yarnClient = createAndInitYarnClient(conf);
|
|
|
+ // if non-null, hook its lifecycle up
|
|
|
+ addIfService(yarnClient);
|
|
|
+ activeRootPath = new Path(conf.get(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
|
|
|
+ YarnConfiguration
|
|
|
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
|
|
|
+ doneRootPath = new Path(conf.get(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
|
|
|
+ YarnConfiguration
|
|
|
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
|
|
|
+ fs = activeRootPath.getFileSystem(conf);
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
|
private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
|
|
|
throws RuntimeException {
|
|
|
- Collection<String> pluginNames = conf.getStringCollection(
|
|
|
+ Collection<String> pluginNames = conf.getTrimmedStringCollection(
|
|
|
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
|
|
|
List<TimelineEntityGroupPlugin> pluginList
|
|
|
= new LinkedList<TimelineEntityGroupPlugin>();
|
|
|
+ Exception caught = null;
|
|
|
for (final String name : pluginNames) {
|
|
|
LOG.debug("Trying to load plugin class {}", name);
|
|
|
TimelineEntityGroupPlugin cacheIdPlugin = null;
|
|
@@ -191,10 +207,11 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
clazz, conf);
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Error loading plugin " + name, e);
|
|
|
+ caught = e;
|
|
|
}
|
|
|
|
|
|
if (cacheIdPlugin == null) {
|
|
|
- throw new RuntimeException("No class defined for " + name);
|
|
|
+ throw new RuntimeException("No class defined for " + name, caught);
|
|
|
}
|
|
|
LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
|
|
|
pluginList.add(cacheIdPlugin);
|
|
@@ -210,8 +227,9 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
+
|
|
|
+ super.serviceStart();
|
|
|
LOG.info("Starting {}", getName());
|
|
|
- yarnClient.start();
|
|
|
summaryStore.start();
|
|
|
|
|
|
Configuration conf = getConfig();
|
|
@@ -219,16 +237,10 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
aclManager.setTimelineStore(summaryStore);
|
|
|
summaryTdm = new TimelineDataManager(summaryStore, aclManager);
|
|
|
summaryTdm.init(conf);
|
|
|
- summaryTdm.start();
|
|
|
- activeRootPath = new Path(conf.get(
|
|
|
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
|
|
|
- YarnConfiguration
|
|
|
- .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
|
|
|
- doneRootPath = new Path(conf.get(
|
|
|
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
|
|
|
- YarnConfiguration
|
|
|
- .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
|
|
|
- fs = activeRootPath.getFileSystem(conf);
|
|
|
+ addService(summaryTdm);
|
|
|
+ // start child services that aren't already started
|
|
|
+ super.serviceStart();
|
|
|
+
|
|
|
if (!fs.exists(activeRootPath)) {
|
|
|
fs.mkdirs(activeRootPath);
|
|
|
fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
|
|
@@ -257,7 +269,8 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
|
|
|
YarnConfiguration
|
|
|
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
|
|
|
- LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
|
|
|
+ LOG.info("Scanning active directory {} every {} seconds", activeRootPath,
|
|
|
+ scanIntervalSecs);
|
|
|
LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
|
|
|
|
|
|
executor = new ScheduledThreadPoolExecutor(numThreads,
|
|
@@ -267,12 +280,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
TimeUnit.SECONDS);
|
|
|
executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
|
|
|
cleanerIntervalSecs, TimeUnit.SECONDS);
|
|
|
- super.serviceStart();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
|
LOG.info("Stopping {}", getName());
|
|
|
+ stopExecutors.set(true);
|
|
|
if (executor != null) {
|
|
|
executor.shutdown();
|
|
|
if (executor.isTerminating()) {
|
|
@@ -286,18 +299,9 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if (summaryTdm != null) {
|
|
|
- summaryTdm.stop();
|
|
|
- }
|
|
|
- if (summaryStore != null) {
|
|
|
- summaryStore.stop();
|
|
|
- }
|
|
|
- if (yarnClient != null) {
|
|
|
- yarnClient.stop();
|
|
|
- }
|
|
|
synchronized (cachedLogs) {
|
|
|
for (EntityCacheItem cacheItem : cachedLogs.values()) {
|
|
|
- cacheItem.getStore().close();
|
|
|
+ ServiceOperations.stopQuietly(cacheItem.getStore());
|
|
|
}
|
|
|
}
|
|
|
super.serviceStop();
|
|
@@ -305,17 +309,34 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
|
@VisibleForTesting
|
|
|
- void scanActiveLogs() throws IOException {
|
|
|
- RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
|
|
|
+ int scanActiveLogs() throws IOException {
|
|
|
+ RemoteIterator<FileStatus> iter = list(activeRootPath);
|
|
|
+ int logsToScanCount = 0;
|
|
|
while (iter.hasNext()) {
|
|
|
FileStatus stat = iter.next();
|
|
|
- ApplicationId appId = parseApplicationId(stat.getPath().getName());
|
|
|
+ String name = stat.getPath().getName();
|
|
|
+ ApplicationId appId = parseApplicationId(name);
|
|
|
if (appId != null) {
|
|
|
LOG.debug("scan logs for {} in {}", appId, stat.getPath());
|
|
|
+ logsToScanCount++;
|
|
|
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
|
|
|
executor.execute(new ActiveLogParser(logs));
|
|
|
+ } else {
|
|
|
+ LOG.debug("Unable to parse entry {}", name);
|
|
|
}
|
|
|
}
|
|
|
+ return logsToScanCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * List a directory, returning an iterator which will fail fast if this
|
|
|
+ * service has been stopped
|
|
|
+ * @param path path to list
|
|
|
+ * @return an iterator over the contents of the directory
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private RemoteIterator<FileStatus> list(Path path) throws IOException {
|
|
|
+ return new StoppableRemoteIterator(fs.listStatusIterator(path));
|
|
|
}
|
|
|
|
|
|
private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
|
|
@@ -377,11 +398,11 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
@VisibleForTesting
|
|
|
- static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
|
|
|
+ void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
|
|
|
throws IOException {
|
|
|
long now = Time.now();
|
|
|
// Depth first search from root directory for all application log dirs
|
|
|
- RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
|
|
|
+ RemoteIterator<FileStatus> iter = list(dirpath);
|
|
|
while (iter.hasNext()) {
|
|
|
FileStatus stat = iter.next();
|
|
|
if (stat.isDirectory()) {
|
|
@@ -456,7 +477,42 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
bucket1, bucket2, appId.toString()));
|
|
|
}
|
|
|
|
|
|
- // This method has to be synchronized to control traffic to RM
|
|
|
+ /**
|
|
|
+ * Create and initialize the YARN Client. Tests may override/mock this.
|
|
|
+ * If they return null, then {@link #getAppState(ApplicationId)} MUST
|
|
|
+ * also be overridden
|
|
|
+ * @param conf configuration
|
|
|
+ * @return the yarn client, or null.
|
|
|
+ *
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ protected YarnClient createAndInitYarnClient(Configuration conf) {
|
|
|
+ YarnClient client = YarnClient.createYarnClient();
|
|
|
+ client.init(conf);
|
|
|
+ return client;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the application state.
|
|
|
+ * @param appId application ID
|
|
|
+ * @return the state or {@link AppState#UNKNOWN} if it could not
|
|
|
+ * be determined
|
|
|
+ * @throws IOException on IO problems
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ protected AppState getAppState(ApplicationId appId) throws IOException {
|
|
|
+ return getAppState(appId, yarnClient);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ask the RM for the state of the application.
|
|
|
+ * This method has to be synchronized to control traffic to RM
|
|
|
+ * @param appId application ID
|
|
|
+ * @param yarnClient
|
|
|
+ * @return the state or {@link AppState#UNKNOWN} if it could not
|
|
|
+ * be determined
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
private static synchronized AppState getAppState(ApplicationId appId,
|
|
|
YarnClient yarnClient) throws IOException {
|
|
|
AppState appState = AppState.ACTIVE;
|
|
@@ -474,9 +530,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
return appState;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Application states,
|
|
|
+ */
|
|
|
@InterfaceAudience.Private
|
|
|
@VisibleForTesting
|
|
|
- enum AppState {
|
|
|
+ public enum AppState {
|
|
|
ACTIVE,
|
|
|
UNKNOWN,
|
|
|
COMPLETED
|
|
@@ -526,7 +585,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
if (!isDone()) {
|
|
|
LOG.debug("Try to parse summary log for log {} in {}",
|
|
|
appId, appDirPath);
|
|
|
- appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
|
|
|
+ appState = getAppState(appId);
|
|
|
long recentLogModTime = scanForLogs();
|
|
|
if (appState == AppState.UNKNOWN) {
|
|
|
if (Time.now() - recentLogModTime > unknownActiveMillis) {
|
|
@@ -559,8 +618,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
long scanForLogs() throws IOException {
|
|
|
LOG.debug("scanForLogs on {}", appDirPath);
|
|
|
long newestModTime = 0;
|
|
|
- RemoteIterator<FileStatus> iterAttempt =
|
|
|
- fs.listStatusIterator(appDirPath);
|
|
|
+ RemoteIterator<FileStatus> iterAttempt = list(appDirPath);
|
|
|
while (iterAttempt.hasNext()) {
|
|
|
FileStatus statAttempt = iterAttempt.next();
|
|
|
LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
|
|
@@ -572,8 +630,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
continue;
|
|
|
}
|
|
|
String attemptDirName = statAttempt.getPath().getName();
|
|
|
- RemoteIterator<FileStatus> iterCache
|
|
|
- = fs.listStatusIterator(statAttempt.getPath());
|
|
|
+ RemoteIterator<FileStatus> iterCache = list(statAttempt.getPath());
|
|
|
while (iterCache.hasNext()) {
|
|
|
FileStatus statCache = iterCache.next();
|
|
|
if (!statCache.isFile()) {
|
|
@@ -659,14 +716,34 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Extract any nested throwable forwarded from IPC operations.
|
|
|
+ * @param e exception
|
|
|
+ * @return either the exception passed an an argument, or any nested
|
|
|
+ * exception which was wrapped inside an {@link UndeclaredThrowableException}
|
|
|
+ */
|
|
|
+ private Throwable extract(Exception e) {
|
|
|
+ Throwable t = e;
|
|
|
+ if (e instanceof UndeclaredThrowableException && e.getCause() != null) {
|
|
|
+ t = e.getCause();
|
|
|
+ }
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+
|
|
|
private class EntityLogScanner implements Runnable {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
LOG.debug("Active scan starting");
|
|
|
try {
|
|
|
- scanActiveLogs();
|
|
|
+ int scanned = scanActiveLogs();
|
|
|
+ LOG.debug("Scanned {} active applications", scanned);
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Error scanning active files", e);
|
|
|
+ Throwable t = extract(e);
|
|
|
+ if (t instanceof InterruptedException) {
|
|
|
+ LOG.info("File scanner interrupted");
|
|
|
+ } else {
|
|
|
+ LOG.error("Error scanning active files", t);
|
|
|
+ }
|
|
|
}
|
|
|
LOG.debug("Active scan complete");
|
|
|
}
|
|
@@ -690,7 +767,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
}
|
|
|
LOG.debug("End parsing summary logs. ");
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Error processing logs for " + appLogs.getAppId(), e);
|
|
|
+ Throwable t = extract(e);
|
|
|
+ if (t instanceof InterruptedException) {
|
|
|
+ LOG.info("Log parser interrupted");
|
|
|
+ } else {
|
|
|
+ LOG.error("Error processing logs for " + appLogs.getAppId(), t);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -702,7 +784,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
try {
|
|
|
cleanLogs(doneRootPath, fs, logRetainMillis);
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Error cleaning files", e);
|
|
|
+ Throwable t = extract(e);
|
|
|
+ if (t instanceof InterruptedException) {
|
|
|
+ LOG.info("Cleaner interrupted");
|
|
|
+ } else {
|
|
|
+ LOG.error("Error cleaning files", e);
|
|
|
+ }
|
|
|
}
|
|
|
LOG.debug("Cleaner finished");
|
|
|
}
|
|
@@ -892,4 +979,29 @@ public class EntityGroupFSTimelineStore extends AbstractService
|
|
|
public void put(TimelineDomain domain) throws IOException {
|
|
|
summaryStore.put(domain);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This is a special remote iterator whose {@link #hasNext()} method
|
|
|
+ * returns false if {@link #stopExecutors} is true.
|
|
|
+ *
|
|
|
+ * This provides an implicit shutdown of all iterative file list and scan
|
|
|
+ * operations without needing to implement it in the while loops themselves.
|
|
|
+ */
|
|
|
+ private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
|
|
|
+ private final RemoteIterator<FileStatus> remote;
|
|
|
+
|
|
|
+ public StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
|
|
|
+ this.remote = remote;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean hasNext() throws IOException {
|
|
|
+ return !stopExecutors.get() && remote.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FileStatus next() throws IOException {
|
|
|
+ return remote.next();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|