소스 검색

YARN-3879 [Storage implementation] Create HDFS backing storage implementation for ATS reads. Contributed by Abhishek Modi.

Vrushali C 6 년 전
부모
커밋
7ed627af6b

+ 50 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -38,6 +37,11 @@ import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@@ -68,7 +72,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService
   private static final Logger LOG =
       LoggerFactory.getLogger(FileSystemTimelineReaderImpl.class);
 
-  private String rootPath;
+  private FileSystem fs;
+  private Path rootPath;
+  private Path entitiesPath;
   private static final String ENTITIES_DIR = "entities";
 
   /** Default extension for output files. */
@@ -94,7 +100,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
 
   @VisibleForTesting
   String getRootPath() {
-    return rootPath;
+    return rootPath.toString();
   }
 
   private static ObjectMapper mapper;
@@ -162,12 +168,12 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     if (clusterId == null || appId == null) {
       throw new IOException("Unable to get flow info");
     }
-    String appFlowMappingFile = rootPath + File.separator +  ENTITIES_DIR +
-        File.separator + clusterId + File.separator + APP_FLOW_MAPPING_FILE;
+    Path clusterIdPath = new Path(entitiesPath, clusterId);
+    Path appFlowMappingFilePath = new Path(clusterIdPath,
+            APP_FLOW_MAPPING_FILE);
     try (BufferedReader reader =
              new BufferedReader(new InputStreamReader(
-                 new FileInputStream(
-                     appFlowMappingFile), Charset.forName("UTF-8")));
+                 fs.open(appFlowMappingFilePath), Charset.forName("UTF-8")));
          CSVParser parser = new CSVParser(reader, csvFormat)) {
       for (CSVRecord record : parser.getRecords()) {
         if (record.size() < 4) {
@@ -266,7 +272,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     return entity;
   }
 
-  private Set<TimelineEntity> getEntities(File dir, String entityType,
+  private Set<TimelineEntity> getEntities(Path dir, String entityType,
       TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
       throws IOException {
     // First sort the selected entities based on created/start time.
@@ -280,15 +286,18 @@ public class FileSystemTimelineReaderImpl extends AbstractService
           }
         );
     if (dir != null) {
-      File[] files = dir.listFiles();
-      if (files != null) {
-        for (File entityFile : files) {
+      RemoteIterator<LocatedFileStatus> fileStatuses = fs.listFiles(dir,
+              false);
+      if (fileStatuses != null) {
+        while (fileStatuses.hasNext()) {
+          LocatedFileStatus locatedFileStatus = fileStatuses.next();
+          Path entityFile = locatedFileStatus.getPath();
           if (!entityFile.getName()
               .contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
             continue;
           }
           try (BufferedReader reader = new BufferedReader(
-              new InputStreamReader(new FileInputStream(entityFile),
+              new InputStreamReader(fs.open(entityFile),
                   Charset.forName("UTF-8")))) {
             TimelineEntity entity = readEntityFromFile(reader);
             if (!entity.getType().equals(entityType)) {
@@ -366,25 +375,30 @@ public class FileSystemTimelineReaderImpl extends AbstractService
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+    String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
         conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
+    rootPath = new Path(outputRoot);
+    entitiesPath = new Path(rootPath, ENTITIES_DIR);
+    fs = rootPath.getFileSystem(conf);
     super.serviceInit(conf);
   }
 
   @Override
   public TimelineEntity getEntity(TimelineReaderContext context,
       TimelineDataToRetrieve dataToRetrieve) throws IOException {
-    String flowRunPath = getFlowRunPath(context.getUserId(),
+    String flowRunPathStr = getFlowRunPath(context.getUserId(),
         context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
         context.getAppId());
-    File dir = new File(new File(rootPath, ENTITIES_DIR),
-        context.getClusterId() + File.separator + flowRunPath + File.separator +
-        context.getAppId() + File.separator + context.getEntityType());
-    File entityFile = new File(
-        dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+    Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
+    Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
+    Path appIdPath = new Path(flowRunPath, context.getAppId());
+    Path entityTypePath = new Path(appIdPath, context.getEntityType());
+    Path entityFilePath = new Path(entityTypePath,
+            context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+
     try (BufferedReader reader =
              new BufferedReader(new InputStreamReader(
-                 new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+                 fs.open(entityFilePath), Charset.forName("UTF-8")))) {
       TimelineEntity entity = readEntityFromFile(reader);
       return createEntityToBeReturned(
           entity, dataToRetrieve.getFieldsToRetrieve());
@@ -399,32 +413,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService
   public Set<TimelineEntity> getEntities(TimelineReaderContext context,
       TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
       throws IOException {
-    String flowRunPath = getFlowRunPath(context.getUserId(),
+    String flowRunPathStr = getFlowRunPath(context.getUserId(),
         context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
         context.getAppId());
-    File dir =
-        new File(new File(rootPath, ENTITIES_DIR),
-            context.getClusterId() + File.separator + flowRunPath +
-            File.separator + context.getAppId() + File.separator +
-            context.getEntityType());
-    return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
+    Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
+    Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
+    Path appIdPath = new Path(flowRunPath, context.getAppId());
+    Path entityTypePath = new Path(appIdPath, context.getEntityType());
+
+    return getEntities(entityTypePath, context.getEntityType(), filters,
+            dataToRetrieve);
   }
 
   @Override public Set<String> getEntityTypes(TimelineReaderContext context)
       throws IOException {
     Set<String> result = new TreeSet<>();
-    String flowRunPath = getFlowRunPath(context.getUserId(),
+    String flowRunPathStr = getFlowRunPath(context.getUserId(),
         context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
         context.getAppId());
-    File dir = new File(new File(rootPath, ENTITIES_DIR),
-        context.getClusterId() + File.separator + flowRunPath
-            + File.separator + context.getAppId());
-    File[] fileList = dir.listFiles();
-    if (fileList != null) {
-      for (File f : fileList) {
-        if (f.isDirectory()) {
-          result.add(f.getName());
-        }
+    Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
+    Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
+    Path appIdPath = new Path(flowRunPath, context.getAppId());
+    FileStatus[] fileStatuses = fs.listStatus(appIdPath);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        result.add(fileStatus.getPath().getName());
       }
     }
     return result;