Bläddra i källkod

HADOOP-16381. The JSON License is included in binary tarball via azure-documentdb:1.16.2. Contributed by Sushil Ks.

Akira Ajisaka 5 år sedan
förälder
incheckning
ccaa99c923
9 ändrade filer med 459 tillägg och 149 borttagningar
  1. 34 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
  2. 18 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
  3. 1 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
  4. 0 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
  5. 68 47
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
  6. 128 75
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
  7. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java
  8. 47 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java
  9. 96 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java

+ 34 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml

@@ -28,7 +28,8 @@
   <properties>
     <!-- Needed for generating FindBugs warnings using parent pom -->
     <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
-    <azure.documentdb.version>1.16.2</azure.documentdb.version>
+    <azure.documentdb.version>2.4.5</azure.documentdb.version>
+    <rxjava.version>1.3.8</rxjava.version>
   </properties>
 
   <dependencies>
@@ -44,11 +45,6 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>com.microsoft.azure</groupId>
-      <artifactId>azure-documentdb</artifactId>
-      <version>${azure.documentdb.version}</version>
-    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
@@ -84,6 +80,38 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>io.reactivex</groupId>
+      <artifactId>rxjava</artifactId>
+      <version>${rxjava.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.microsoft.azure</groupId>
+      <artifactId>azure-cosmosdb</artifactId>
+      <version>${azure.documentdb.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-handler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec-http</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.reactivex</groupId>
+          <artifactId>rxjava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-handler-proxy</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>

+ 18 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java

@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.documentstore;
 
-import com.microsoft.azure.documentdb.ConnectionPolicy;
-import com.microsoft.azure.documentdb.ConsistencyLevel;
-import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.cosmosdb.ConnectionPolicy;
+import com.microsoft.azure.cosmosdb.ConsistencyLevel;
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
@@ -134,6 +134,10 @@ public final class DocumentStoreUtils {
    * @return false if any of the string is null or empty else true
    */
   public static boolean isNullOrEmpty(String...values) {
+    if (values == null || values.length == 0) {
+      return true;
+    }
+
     for (String value : values) {
       if (value == null || value.isEmpty()) {
         return true;
@@ -143,15 +147,20 @@ public final class DocumentStoreUtils {
   }
 
   /**
-   * Creates CosmosDB Document Client.
+   * Creates CosmosDB Async Document Client.
    * @param conf
    *          to retrieve cosmos db endpoint and key
    * @return async document client for CosmosDB
    */
-  public static DocumentClient createCosmosDBClient(Configuration conf){
-    return new DocumentClient(DocumentStoreUtils.getCosmosDBEndpoint(conf),
-        DocumentStoreUtils.getCosmosDBMasterKey(conf),
-        ConnectionPolicy.GetDefault(), ConsistencyLevel.Session);
+  public static AsyncDocumentClient createCosmosDBAsyncClient(
+      Configuration conf){
+    return new AsyncDocumentClient.Builder()
+      .withServiceEndpoint(DocumentStoreUtils.getCosmosDBEndpoint(conf))
+      .withMasterKeyOrResourceToken(
+          DocumentStoreUtils.getCosmosDBMasterKey(conf))
+      .withConnectionPolicy(ConnectionPolicy.GetDefault())
+      .withConsistencyLevel(ConsistencyLevel.Session)
+      .build();
   }
 
   /**
@@ -486,4 +495,4 @@ public final class DocumentStoreUtils {
       return false;
     }
   }
-}
+}

+ 1 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java

@@ -30,7 +30,6 @@ import java.util.Map;
 public class TimelineEventSubDoc {
 
   private final TimelineEvent timelineEvent;
-  private boolean valid;
 
   public TimelineEventSubDoc() {
     timelineEvent = new TimelineEvent();
@@ -51,11 +50,7 @@ public class TimelineEventSubDoc {
   public boolean isValid() {
     return timelineEvent.isValid();
   }
-
-  public void setValid(boolean valid) {
-    this.valid = valid;
-  }
-
+  
   public long getTimestamp() {
     return timelineEvent.getTimestamp();
   }

+ 0 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java

@@ -31,7 +31,6 @@ import java.util.TreeMap;
 public class TimelineMetricSubDoc {
 
   private final TimelineMetric timelineMetric;
-  private boolean valid;
   private long singleDataTimestamp;
   private Number singleDataValue = 0;
 
@@ -41,7 +40,6 @@ public class TimelineMetricSubDoc {
 
   public TimelineMetricSubDoc(TimelineMetric timelineMetric) {
     this.timelineMetric = timelineMetric;
-    this.valid = timelineMetric.isValid();
     if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE &&
         timelineMetric.getValues().size() > 0) {
       this.singleDataTimestamp = timelineMetric.getSingleDataTimestamp();
@@ -130,10 +128,6 @@ public class TimelineMetricSubDoc {
     timelineMetric.setType(metricType);
   }
 
-  public void setValid(boolean valid) {
-    this.valid = valid;
-  }
-
   public boolean isValid() {
     return (timelineMetric.getId() != null);
   }

+ 68 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java

@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
 
-import com.microsoft.azure.documentdb.Document;
-import com.microsoft.azure.documentdb.DocumentClient;
-import com.microsoft.azure.documentdb.FeedOptions;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.microsoft.azure.cosmosdb.FeedOptions;
+import com.microsoft.azure.cosmosdb.FeedResponse;
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
 import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
@@ -30,12 +32,14 @@ import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentS
 import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.schedulers.Schedulers;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 
 /**
@@ -49,7 +53,7 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
       .getLogger(CosmosDBDocumentStoreReader.class);
   private static final int DEFAULT_DOCUMENTS_SIZE = 1;
 
-  private static volatile DocumentClient client;
+  private static AsyncDocumentClient client;
   private final String databaseName;
   private final static String COLLECTION_LINK = "/dbs/%s/colls/%s";
   private final static String SELECT_TOP_FROM_COLLECTION = "SELECT TOP %d * " +
@@ -66,17 +70,24 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
       "\"%s\") ";
   private final static String ORDER_BY_CLAUSE = " ORDER BY c.createdTime";
 
+  // creating thread pool of size, half of the total available threads from JVM
+  private static ExecutorService executorService = Executors.newFixedThreadPool(
+      Runtime.getRuntime().availableProcessors() / 2);
+  private static Scheduler schedulerForBlockingWork =
+      Schedulers.from(executorService);
+
   public CosmosDBDocumentStoreReader(Configuration conf) {
     LOG.info("Initializing Cosmos DB DocumentStoreReader...");
     databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
-    // making CosmosDB Client Singleton
+    initCosmosDBClient(conf);
+  }
+
+  private synchronized void initCosmosDBClient(Configuration conf) {
+    // making CosmosDB Async Client Singleton
     if (client == null) {
-      synchronized (this) {
-        if (client == null) {
-          LOG.info("Creating Cosmos DB Client...");
-          client = DocumentStoreUtils.createCosmosDBClient(conf);
-        }
-      }
+      LOG.info("Creating Cosmos DB Reader Async Client...");
+      client = DocumentStoreUtils.createCosmosDBAsyncClient(conf);
+      addShutdownHook();
     }
   }
 
@@ -104,15 +115,16 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
     LOG.debug("Querying Collection : {} , with query {}", collectionName,
         sqlQuery);
 
-    Set<String> entityTypes = new HashSet<>();
-    Iterator<Document> documentIterator = client.queryDocuments(
+    return Sets.newHashSet(client.queryDocuments(
         String.format(COLLECTION_LINK, databaseName, collectionName),
-        sqlQuery, null).getQueryIterator();
-    while (documentIterator.hasNext()) {
-      Document document = documentIterator.next();
-      entityTypes.add(document.getString(ENTITY_TYPE_COLUMN));
-    }
-    return entityTypes;
+        sqlQuery, new FeedOptions())
+        .map(FeedResponse::getResults) // Map the page to the list of documents
+        .concatMap(Observable::from)
+        .map(document -> String.valueOf(document.get(ENTITY_TYPE_COLUMN)))
+        .toList()
+        .subscribeOn(schedulerForBlockingWork)
+        .toBlocking()
+        .single());
   }
 
   @Override
@@ -133,25 +145,25 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
       final long maxDocumentsSize) {
     final String sqlQuery = buildQueryWithPredicates(context, collectionName,
         maxDocumentsSize);
-    List<TimelineDoc> timelineDocs = new ArrayList<>();
     LOG.debug("Querying Collection : {} , with query {}", collectionName,
         sqlQuery);
 
-    FeedOptions feedOptions = new FeedOptions();
-    feedOptions.setPageSize((int) maxDocumentsSize);
-    Iterator<Document> documentIterator = client.queryDocuments(
-        String.format(COLLECTION_LINK, databaseName, collectionName),
-        sqlQuery, feedOptions).getQueryIterator();
-    while (documentIterator.hasNext()) {
-      Document document = documentIterator.next();
-      TimelineDoc resultDoc = document.toObject(docClass);
-      if (resultDoc.getCreatedTime() == 0 &&
-          document.getTimestamp() != null) {
-        resultDoc.setCreatedTime(document.getTimestamp().getTime());
-      }
-      timelineDocs.add(resultDoc);
-    }
-    return timelineDocs;
+    return client.queryDocuments(String.format(COLLECTION_LINK,
+        databaseName, collectionName), sqlQuery, new FeedOptions())
+        .map(FeedResponse::getResults) // Map the page to the list of documents
+        .concatMap(Observable::from)
+        .map(document -> {
+          TimelineDoc resultDoc = document.toObject(docClass);
+          if (resultDoc.getCreatedTime() == 0 &&
+              document.getTimestamp() != null) {
+            resultDoc.setCreatedTime(document.getTimestamp().getTime());
+          }
+          return resultDoc;
+        })
+        .toList()
+        .subscribeOn(schedulerForBlockingWork)
+        .toBlocking()
+        .single();
   }
 
   private String buildQueryWithPredicates(TimelineReaderContext context,
@@ -168,33 +180,34 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
     return addPredicates(context, collectionName, queryStrBuilder);
   }
 
-  private String addPredicates(TimelineReaderContext context,
+  @VisibleForTesting
+  String addPredicates(TimelineReaderContext context,
       String collectionName, StringBuilder queryStrBuilder) {
     boolean hasPredicate = false;
 
     queryStrBuilder.append(WHERE_CLAUSE);
 
-    if (context.getClusterId() != null) {
+    if (!DocumentStoreUtils.isNullOrEmpty(context.getClusterId())) {
       hasPredicate = true;
       queryStrBuilder.append(String.format(CONTAINS_FUNC_FOR_ID,
           context.getClusterId()));
     }
-    if (context.getUserId() != null) {
+    if (!DocumentStoreUtils.isNullOrEmpty(context.getUserId())) {
       hasPredicate = true;
       queryStrBuilder.append(AND_OPERATOR)
           .append(String.format(CONTAINS_FUNC_FOR_ID, context.getUserId()));
     }
-    if (context.getFlowName() != null) {
+    if (!DocumentStoreUtils.isNullOrEmpty(context.getFlowName())) {
       hasPredicate = true;
       queryStrBuilder.append(AND_OPERATOR)
           .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowName()));
     }
-    if (context.getAppId() != null) {
+    if (!DocumentStoreUtils.isNullOrEmpty(context.getAppId())) {
       hasPredicate = true;
       queryStrBuilder.append(AND_OPERATOR)
           .append(String.format(CONTAINS_FUNC_FOR_ID, context.getAppId()));
     }
-    if (context.getEntityId() != null) {
+    if (!DocumentStoreUtils.isNullOrEmpty(context.getEntityId())) {
       hasPredicate = true;
       queryStrBuilder.append(AND_OPERATOR)
           .append(String.format(CONTAINS_FUNC_FOR_ID, context.getEntityId()));
@@ -204,7 +217,7 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
       queryStrBuilder.append(AND_OPERATOR)
           .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowRunId()));
     }
-    if (context.getEntityType() != null){
+    if (!DocumentStoreUtils.isNullOrEmpty(context.getEntityType())){
       hasPredicate = true;
       queryStrBuilder.append(AND_OPERATOR)
           .append(String.format(CONTAINS_FUNC_FOR_TYPE,
@@ -224,9 +237,17 @@ public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
   @Override
   public synchronized void close() {
     if (client != null) {
-      LOG.info("Closing Cosmos DB Client...");
+      LOG.info("Closing Cosmos DB Reader Async Client...");
       client.close();
       client = null;
     }
   }
-}
+
+  private void addShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }));
+  }
+}

+ 128 - 75
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java

@@ -19,14 +19,20 @@
 package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
 
 
-import com.microsoft.azure.documentdb.AccessCondition;
-import com.microsoft.azure.documentdb.AccessConditionType;
-import com.microsoft.azure.documentdb.Database;
-import com.microsoft.azure.documentdb.Document;
-import com.microsoft.azure.documentdb.DocumentClient;
-import com.microsoft.azure.documentdb.DocumentClientException;
-import com.microsoft.azure.documentdb.DocumentCollection;
-import com.microsoft.azure.documentdb.RequestOptions;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.azure.cosmosdb.AccessCondition;
+import com.microsoft.azure.cosmosdb.AccessConditionType;
+import com.microsoft.azure.cosmosdb.Database;
+import com.microsoft.azure.cosmosdb.Document;
+import com.microsoft.azure.cosmosdb.DocumentClientException;
+import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.FeedResponse;
+import com.microsoft.azure.cosmosdb.RequestOptions;
+import com.microsoft.azure.cosmosdb.ResourceResponse;
+import com.microsoft.azure.cosmosdb.SqlParameter;
+import com.microsoft.azure.cosmosdb.SqlParameterCollection;
+import com.microsoft.azure.cosmosdb.SqlQuerySpec;
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
@@ -40,6 +46,13 @@ import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentS
 import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * This is the Document Store Writer implementation for
@@ -51,79 +64,102 @@ public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument>
   private static final Logger LOG = LoggerFactory
       .getLogger(CosmosDBDocumentStoreWriter.class);
 
-  private static volatile DocumentClient client;
   private final String databaseName;
   private static final PerNodeAggTimelineCollectorMetrics METRICS =
       PerNodeAggTimelineCollectorMetrics.getInstance();
+
+  private static AsyncDocumentClient client;
+  // creating thread pool of size equal to number of collection types
+  private ExecutorService executorService =
+      Executors.newFixedThreadPool(CollectionType.values().length);
+  private Scheduler schedulerForBlockingWork =
+      Schedulers.from(executorService);
+
   private static final String DATABASE_LINK = "/dbs/%s";
   private static final String COLLECTION_LINK = DATABASE_LINK + "/colls/%s";
   private static final String DOCUMENT_LINK = COLLECTION_LINK + "/docs/%s";
+  private static final String ID = "@id";
+  private static final String QUERY_COLLECTION_IF_EXISTS = "SELECT * FROM r " +
+      "where r.id = " + ID;
 
   public CosmosDBDocumentStoreWriter(Configuration conf) {
     LOG.info("Initializing Cosmos DB DocumentStoreWriter...");
     databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
-    // making CosmosDB Client Singleton
+    initCosmosDBClient(conf);
+  }
+
+  private synchronized void initCosmosDBClient(Configuration conf) {
+    // making CosmosDB Async Client Singleton
     if (client == null) {
-      synchronized (this) {
-        if (client == null) {
-          LOG.info("Creating Cosmos DB Client...");
-          client = DocumentStoreUtils.createCosmosDBClient(conf);
-        }
-      }
+      LOG.info("Creating Cosmos DB Writer Async Client...");
+      client = DocumentStoreUtils.createCosmosDBAsyncClient(conf);
+      addShutdownHook();
     }
   }
 
   @Override
   public void createDatabase() {
-    try {
-      client.readDatabase(String.format(
-          DATABASE_LINK, databaseName), new RequestOptions());
-      LOG.info("Database {} already exists.", databaseName);
-    } catch (DocumentClientException docExceptionOnRead) {
-      if (docExceptionOnRead.getStatusCode() ==  404) {
-        LOG.info("Creating new Database : {}", databaseName);
-        Database databaseDefinition = new Database();
-        databaseDefinition.setId(databaseName);
-        try {
-          client.createDatabase(databaseDefinition, new RequestOptions());
-        } catch (DocumentClientException docExceptionOnCreate) {
-          LOG.error("Unable to create new Database : {}", databaseName,
-              docExceptionOnCreate);
-        }
-      } else {
-        LOG.error("Error while reading Database : {}", databaseName,
-            docExceptionOnRead);
-      }
-    }
+    Observable<ResourceResponse<Database>> databaseReadObs =
+        client.readDatabase(String.format(DATABASE_LINK, databaseName), null);
+
+    Observable<ResourceResponse<Database>> databaseExistenceObs =
+        databaseReadObs
+            .doOnNext(databaseResourceResponse ->
+                LOG.info("Database {} already exists.", databaseName))
+            .onErrorResumeNext(throwable -> {
+              // if the database doesn't exists
+              // readDatabase() will result in 404 error
+              if (throwable instanceof DocumentClientException) {
+                DocumentClientException de =
+                    (DocumentClientException) throwable;
+                if (de.getStatusCode() == 404) {
+                  // if the database doesn't exist, create it.
+                  LOG.info("Creating new Database : {}", databaseName);
+
+                  Database dbDefinition = new Database();
+                  dbDefinition.setId(databaseName);
+
+                  return client.createDatabase(dbDefinition, null);
+                }
+              }
+              // some unexpected failure in reading database happened.
+              // pass the error up.
+              LOG.error("Reading database : {} if it exists failed.",
+                  databaseName, throwable);
+              return Observable.error(throwable);
+            });
+    // wait for completion
+    databaseExistenceObs.toCompletable().await();
   }
 
   @Override
   public void createCollection(final String collectionName) {
     LOG.info("Creating Timeline Collection : {} for Database : {}",
         collectionName, databaseName);
-    try {
-      client.readCollection(String.format(COLLECTION_LINK, databaseName,
-          collectionName), new RequestOptions());
-      LOG.info("Collection {} already exists.", collectionName);
-    } catch (DocumentClientException docExceptionOnRead) {
-      if (docExceptionOnRead.getStatusCode() == 404) {
-        DocumentCollection collection = new DocumentCollection();
-        collection.setId(collectionName);
-        LOG.info("Creating collection {} under Database {}",
-            collectionName, databaseName);
-        try {
-          client.createCollection(
-              String.format(DATABASE_LINK, databaseName),
-              collection, new RequestOptions());
-        } catch (DocumentClientException docExceptionOnCreate) {
-          LOG.error("Unable to create Collection : {} under Database : {}",
-              collectionName, databaseName, docExceptionOnCreate);
-        }
-      } else {
-        LOG.error("Error while reading Collection : {} under Database : {}",
-            collectionName, databaseName, docExceptionOnRead);
-      }
-    }
+    client.queryCollections(String.format(DATABASE_LINK, databaseName),
+        new SqlQuerySpec(QUERY_COLLECTION_IF_EXISTS,
+            new SqlParameterCollection(
+                new SqlParameter(ID, collectionName))), null)
+        .single() // there should be single page of result
+        .flatMap((Func1<FeedResponse<DocumentCollection>, Observable<?>>)
+            page -> {
+            if (page.getResults().isEmpty()) {
+              // if there is no matching collection create one.
+              DocumentCollection collection = new DocumentCollection();
+              collection.setId(collectionName);
+              LOG.info("Creating collection {}", collectionName);
+              return client.createCollection(
+                  String.format(DATABASE_LINK, databaseName),
+                  collection, null);
+            } else {
+              // collection already exists, nothing else to be done.
+              LOG.info("Collection {} already exists.", collectionName);
+              return Observable.empty();
+            }
+          })
+        .doOnError(throwable -> LOG.error("Unable to create collection : {}",
+            collectionName, throwable))
+        .toCompletable().await();
   }
 
   @Override
@@ -156,32 +192,40 @@ public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument>
     AccessCondition accessCondition = new AccessCondition();
     StringBuilder eTagStrBuilder = new StringBuilder();
 
-    TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType,
+    final TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType,
         timelineDoc, eTagStrBuilder);
 
     accessCondition.setCondition(eTagStrBuilder.toString());
     accessCondition.setType(AccessConditionType.IfMatch);
     requestOptions.setAccessCondition(accessCondition);
 
-    try {
-      client.upsertDocument(collectionLink, updatedTimelineDoc,
-          requestOptions, true);
+    ResourceResponse<Document> resourceResponse =
+        client.upsertDocument(collectionLink, updatedTimelineDoc,
+            requestOptions, true)
+            .subscribeOn(schedulerForBlockingWork)
+            .doOnError(throwable ->
+                LOG.error("Error while upserting Collection : {} " +
+                    "with Doc Id : {} under Database : {}",
+                collectionType.getCollectionName(),
+                updatedTimelineDoc.getId(), databaseName, throwable))
+            .toBlocking()
+            .single();
+
+    if (resourceResponse.getStatusCode() == 409) {
+      LOG.warn("There was a conflict while upserting, hence retrying...",
+          resourceResponse);
+      upsertDocument(collectionType, updatedTimelineDoc);
+    } else if (resourceResponse.getStatusCode() >= 200 && resourceResponse
+        .getStatusCode() < 300) {
       LOG.debug("Successfully wrote doc with id : {} and type : {} under " +
           "Database : {}", timelineDoc.getId(), timelineDoc.getType(),
           databaseName);
-    } catch (DocumentClientException e) {
-      if (e.getStatusCode() == 409) {
-        LOG.warn("There was a conflict while upserting, hence retrying...", e);
-        upsertDocument(collectionType, updatedTimelineDoc);
-      }
-      LOG.error("Error while upserting Collection : {} with Doc Id : {} under" +
-          " Database : {}", collectionType.getCollectionName(),
-          updatedTimelineDoc.getId(), databaseName, e);
     }
   }
 
+  @VisibleForTesting
   @SuppressWarnings("unchecked")
-  private TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType,
+  TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType,
       TimelineDoc timelineDoc, StringBuilder eTagStrBuilder) {
     TimelineDoc prevDocument = fetchLatestDoc(collectionType,
         timelineDoc.getId(), eTagStrBuilder);
@@ -192,14 +236,15 @@ public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument>
     return timelineDoc;
   }
 
+  @VisibleForTesting
   @SuppressWarnings("unchecked")
-  private TimelineDoc fetchLatestDoc(final CollectionType collectionType,
+  TimelineDoc fetchLatestDoc(final CollectionType collectionType,
       final String documentId, StringBuilder eTagStrBuilder) {
     final String documentLink = String.format(DOCUMENT_LINK, databaseName,
         collectionType.getCollectionName(), documentId);
     try {
       Document latestDocument = client.readDocument(documentLink, new
-          RequestOptions()).getResource();
+          RequestOptions()).toBlocking().single().getResource();
       TimelineDoc timelineDoc;
       switch (collectionType) {
       case FLOW_RUN:
@@ -227,9 +272,17 @@ public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument>
   @Override
   public synchronized void close() {
     if (client != null) {
-      LOG.info("Closing Cosmos DB Client...");
+      LOG.info("Closing Cosmos DB Writer Async Client...");
       client.close();
       client = null;
     }
   }
+
+  private void addShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }));
+  }
 }

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
+
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test case for {@link CosmosDBDocumentStoreReader}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreUtils.class)
+public class TestCosmosDBDocumentStoreReader {
+
+  @Before
+  public void setUp(){
+    AsyncDocumentClient asyncDocumentClient =
+        Mockito.mock(AsyncDocumentClient.class);
+    PowerMockito.mockStatic(DocumentStoreUtils.class);
+    PowerMockito.when(DocumentStoreUtils.getCosmosDBDatabaseName(
+        ArgumentMatchers.any(Configuration.class)))
+        .thenReturn("FooBar");
+    PowerMockito.when(DocumentStoreUtils.createCosmosDBAsyncClient(
+        ArgumentMatchers.any(Configuration.class)))
+        .thenReturn(asyncDocumentClient);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailureFOnEmptyPredicates() {
+    PowerMockito.when(DocumentStoreUtils.isNullOrEmpty(
+        ArgumentMatchers.any()))
+        .thenReturn(Boolean.TRUE);
+
+    CosmosDBDocumentStoreReader cosmosDBDocumentStoreReader =
+        new CosmosDBDocumentStoreReader(null);
+    cosmosDBDocumentStoreReader.addPredicates(
+        new TimelineReaderContext(null, "", "",
+            null, "", "", null),
+        "DummyCollection", new StringBuilder());
+  }
+}

+ 47 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+
+import java.io.IOException;
+
+/**
+ * This is a mocked class for {@link CosmosDBDocumentStoreWriter}.
+ */
+public class MockedCosmosDBDocumentStoreWriter
+    extends CosmosDBDocumentStoreWriter {
+
+  public MockedCosmosDBDocumentStoreWriter(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  TimelineDocument fetchLatestDoc(CollectionType collectionType,
+      String documentId, StringBuilder eTagStrBuilder) {
+    try {
+      return DocumentStoreTestUtils.bakeTimelineEntityDoc();
+    } catch (IOException e) {
+      return null;
+    }
+  }
+}

+ 96 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+
+/**
+ * Test case for {@link CosmosDBDocumentStoreWriter}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreUtils.class)
+public class TestCosmosDBDocumentStoreWriter {
+
+  @Before
+  public void setUp() {
+    AsyncDocumentClient asyncDocumentClient =
+        Mockito.mock(AsyncDocumentClient.class);
+    PowerMockito.mockStatic(DocumentStoreUtils.class);
+    PowerMockito.when(DocumentStoreUtils.getCosmosDBDatabaseName(
+        ArgumentMatchers.any(Configuration.class)))
+        .thenReturn("FooBar");
+    PowerMockito.when(DocumentStoreUtils.createCosmosDBAsyncClient(
+        ArgumentMatchers.any(Configuration.class)))
+        .thenReturn(asyncDocumentClient);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void applyingUpdatesOnPrevDocTest() throws IOException {
+    MockedCosmosDBDocumentStoreWriter documentStoreWriter =
+        new MockedCosmosDBDocumentStoreWriter(null);
+
+    TimelineEntityDocument actualEntityDoc =
+        new TimelineEntityDocument();
+    TimelineEntityDocument expectedEntityDoc =
+        DocumentStoreTestUtils.bakeTimelineEntityDoc();
+
+    Assert.assertEquals(1, actualEntityDoc.getInfo().size());
+    Assert.assertEquals(0, actualEntityDoc.getMetrics().size());
+    Assert.assertEquals(0, actualEntityDoc.getEvents().size());
+    Assert.assertEquals(0, actualEntityDoc.getConfigs().size());
+    Assert.assertEquals(0,
+        actualEntityDoc.getIsRelatedToEntities().size());
+    Assert.assertEquals(0, actualEntityDoc.
+        getRelatesToEntities().size());
+
+    actualEntityDoc = (TimelineEntityDocument) documentStoreWriter
+        .applyUpdatesOnPrevDoc(CollectionType.ENTITY,
+            actualEntityDoc, null);
+
+    Assert.assertEquals(expectedEntityDoc.getInfo().size(),
+        actualEntityDoc.getInfo().size());
+    Assert.assertEquals(expectedEntityDoc.getMetrics().size(),
+        actualEntityDoc.getMetrics().size());
+    Assert.assertEquals(expectedEntityDoc.getEvents().size(),
+        actualEntityDoc.getEvents().size());
+    Assert.assertEquals(expectedEntityDoc.getConfigs().size(),
+        actualEntityDoc.getConfigs().size());
+    Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+        actualEntityDoc.getIsRelatedToEntities().size());
+    Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+        actualEntityDoc.getRelatesToEntities().size());
+  }
+}