Parcourir la source

HDDS-1501 : Create a Recon task interface to update internal DB on updates from OM. (#819)

avijayanhwx il y a 6 ans
Parent
commit
4b099b8b89
30 fichiers modifiés avec 1814 ajouts et 159 suppressions
  1. 14 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
  2. 1 7
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java
  3. 30 10
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
  4. 8 0
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  5. 0 43
      hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java
  6. 10 10
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
  7. 2 1
      hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/codegen/ReconSchemaGenerationModule.java
  8. 65 0
      hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ReconInternalSchemaDefinition.java
  9. 5 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
  10. 18 7
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
  11. 4 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
  12. 0 1
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java
  13. 16 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java
  14. 11 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
  15. 116 38
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
  16. 150 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
  17. 220 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
  18. 69 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
  19. 66 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java
  20. 46 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
  21. 198 0
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
  22. 5 2
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java
  23. 4 1
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/AbstractSqlDatabaseTest.java
  24. 143 0
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestReconInternalSchemaDefinition.java
  25. 0 17
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java
  26. 25 0
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java
  27. 77 0
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
  28. 133 22
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
  29. 207 0
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java
  30. 171 0
      hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java

+ 14 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java

@@ -22,6 +22,7 @@ package org.apache.hadoop.utils.db;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -158,4 +159,17 @@ public interface DBStore extends AutoCloseable {
    * @return DB file location.
    */
   File getDbLocation();
+
+  /**
+   * Get List of Index to Table Names.
+   * (For decoding table from column family index)
+   * @return Map of Index -> TableName
+   */
+  Map<Integer, String> getTableNames();
+
+  /**
+   * Get Codec registry.
+   * @return codec registry.
+   */
+  CodecRegistry getCodecRegistry();
 }

+ 1 - 7
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java

@@ -57,7 +57,6 @@ public final class DBStoreBuilder {
   private List<String> tableNames;
   private Configuration configuration;
   private CodecRegistry registry;
-  private boolean readOnly = false;
 
   private DBStoreBuilder(Configuration configuration) {
     tables = new HashSet<>();
@@ -114,11 +113,6 @@ public final class DBStoreBuilder {
     return this;
   }
 
-  public DBStoreBuilder setReadOnly(boolean rdOnly) {
-    readOnly = rdOnly;
-    return this;
-  }
-
   /**
    * Builds a DBStore instance and returns that.
    *
@@ -137,7 +131,7 @@ public final class DBStoreBuilder {
     if (!dbFile.getParentFile().exists()) {
       throw new IOException("The DB destination directory should exist.");
     }
-    return new RDBStore(dbFile, options, tables, registry, readOnly);
+    return new RDBStore(dbFile, options, tables, registry);
   }
 
   /**

+ 30 - 10
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java

@@ -64,15 +64,16 @@ public class RDBStore implements DBStore {
   private ObjectName statMBeanName;
   private RDBCheckpointManager checkPointManager;
   private String checkpointsParentDir;
+  private List<ColumnFamilyHandle> columnFamilyHandles;
 
   @VisibleForTesting
   public RDBStore(File dbFile, DBOptions options,
                   Set<TableConfig> families) throws IOException {
-    this(dbFile, options, families, new CodecRegistry(), false);
+    this(dbFile, options, families, new CodecRegistry());
   }
 
   public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families,
-                  CodecRegistry registry, boolean readOnly)
+                  CodecRegistry registry)
       throws IOException {
     Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
     Preconditions.checkNotNull(families);
@@ -81,7 +82,7 @@ public class RDBStore implements DBStore {
     codecRegistry = registry;
     final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
         new ArrayList<>();
-    final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+    columnFamilyHandles = new ArrayList<>();
 
     for (TableConfig family : families) {
       columnFamilyDescriptors.add(family.getDescriptor());
@@ -93,13 +94,8 @@ public class RDBStore implements DBStore {
     writeOptions = new WriteOptions();
 
     try {
-      if (readOnly) {
-        db = RocksDB.openReadOnly(dbOptions, dbLocation.getAbsolutePath(),
-            columnFamilyDescriptors, columnFamilyHandles);
-      } else {
-        db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
-            columnFamilyDescriptors, columnFamilyHandles);
-      }
+      db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
+          columnFamilyDescriptors, columnFamilyHandles);
 
       for (int x = 0; x < columnFamilyHandles.size(); x++) {
         handleTable.put(
@@ -299,7 +295,31 @@ public class RDBStore implements DBStore {
     return dbLocation;
   }
 
+  @Override
+  public Map<Integer, String> getTableNames() {
+    Map<Integer, String> tableNames = new HashMap<>();
+    StringCodec stringCodec = new StringCodec();
+
+    for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
+      try {
+        tableNames.put(columnFamilyHandle.getID(), stringCodec
+            .fromPersistedFormat(columnFamilyHandle.getName()));
+      } catch (RocksDBException | IOException e) {
+        LOG.error("Unexpected exception while reading column family handle " +
+            "name", e);
+      }
+    }
+    return tableNames;
+  }
+
+  @Override
   public CodecRegistry getCodecRegistry() {
     return codecRegistry;
   }
+
+  @VisibleForTesting
+  public RocksDB getDb() {
+    return db;
+  }
+
 }

+ 8 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -2472,4 +2472,12 @@
       connections.
     </description>
   </property>
+  <property>
+    <name>ozone.recon.task.thread.count</name>
+    <value>1</value>
+    <tag>OZONE, RECON</tag>
+    <description>
+      The number of Recon Tasks that are waiting on updates from OM.
+    </description>
+  </property>
 </configuration>

+ 0 - 43
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.utils.db;
 
 import javax.management.MBeanServer;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.charset.StandardCharsets;
@@ -290,46 +289,4 @@ public class TestRDBStore {
           checkpoint.getCheckpointLocation()));
     }
   }
-
-  @Test
-  public void testReadOnlyRocksDB() throws Exception {
-    File dbFile = folder.newFolder();
-    byte[] key = "Key1".getBytes();
-    byte[] value = "Value1".getBytes();
-
-    //Create Rdb and write some data into it.
-    RDBStore newStore = new RDBStore(dbFile, options, configSet);
-    Assert.assertNotNull("DB Store cannot be null", newStore);
-    Table firstTable = newStore.getTable(families.get(0));
-    Assert.assertNotNull("Table cannot be null", firstTable);
-    firstTable.put(key, value);
-
-    RocksDBCheckpoint checkpoint = (RocksDBCheckpoint) newStore.getCheckpoint(
-        true);
-
-    //Create Read Only DB from snapshot of first DB.
-    RDBStore snapshotStore = new RDBStore(checkpoint.getCheckpointLocation()
-        .toFile(), options, configSet, new CodecRegistry(), true);
-
-    Assert.assertNotNull("DB Store cannot be null", newStore);
-
-    //Verify read is allowed.
-    firstTable = snapshotStore.getTable(families.get(0));
-    Assert.assertNotNull("Table cannot be null", firstTable);
-    Assert.assertTrue(Arrays.equals(((byte[])firstTable.get(key)), value));
-
-    //Verify write is not allowed.
-    byte[] key2 =
-        RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
-    byte[] value2 =
-        RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
-    try {
-      firstTable.put(key2, value2);
-      Assert.fail();
-    } catch (IOException e) {
-      Assert.assertTrue(e.getMessage()
-          .contains("Not supported operation in read only mode"));
-    }
-    checkpoint.cleanupCheckpoint();
-  }
 }

+ 10 - 10
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java

@@ -100,16 +100,16 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * |-------------------------------------------------------------------|
    */
 
-  private static final String USER_TABLE = "userTable";
-  private static final String VOLUME_TABLE = "volumeTable";
-  private static final String BUCKET_TABLE = "bucketTable";
-  private static final String KEY_TABLE = "keyTable";
-  private static final String DELETED_TABLE = "deletedTable";
-  private static final String OPEN_KEY_TABLE = "openKeyTable";
-  private static final String S3_TABLE = "s3Table";
-  private static final String MULTIPARTINFO_TABLE = "multipartInfoTable";
-  private static final String S3_SECRET_TABLE = "s3SecretTable";
-  private static final String DELEGATION_TOKEN_TABLE = "dTokenTable";
+  public static final String USER_TABLE = "userTable";
+  public static final String VOLUME_TABLE = "volumeTable";
+  public static final String BUCKET_TABLE = "bucketTable";
+  public static final String KEY_TABLE = "keyTable";
+  public static final String DELETED_TABLE = "deletedTable";
+  public static final String OPEN_KEY_TABLE = "openKeyTable";
+  public static final String S3_TABLE = "s3Table";
+  public static final String MULTIPARTINFO_TABLE = "multipartInfoTable";
+  public static final String S3_SECRET_TABLE = "s3SecretTable";
+  public static final String DELEGATION_TOKEN_TABLE = "dTokenTable";
 
   private DBStore store;
 

+ 2 - 1
hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/codegen/ReconSchemaGenerationModule.java

@@ -17,6 +17,7 @@
  */
 package org.hadoop.ozone.recon.codegen;
 
+import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
 import org.hadoop.ozone.recon.schema.ReconSchemaDefinition;
 import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
 
@@ -34,6 +35,6 @@ public class ReconSchemaGenerationModule extends AbstractModule {
     Multibinder<ReconSchemaDefinition> schemaBinder =
         Multibinder.newSetBinder(binder(), ReconSchemaDefinition.class);
     schemaBinder.addBinding().to(UtilizationSchemaDefinition.class);
-
+    schemaBinder.addBinding().to(ReconInternalSchemaDefinition.class);
   }
 }

+ 65 - 0
hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ReconInternalSchemaDefinition.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.hadoop.ozone.recon.schema;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+
+import com.google.inject.Inject;
+
+/**
+ * Class used to create tables that are required for Recon's internal
+ * management.
+ */
+public class ReconInternalSchemaDefinition implements ReconSchemaDefinition {
+
+  public static final String RECON_TASK_STATUS_TABLE_NAME =
+      "recon_task_status";
+  private final DataSource dataSource;
+
+  @Inject
+  ReconInternalSchemaDefinition(DataSource dataSource) {
+    this.dataSource = dataSource;
+  }
+
+  @Override
+  public void initializeSchema() throws SQLException {
+    Connection conn = dataSource.getConnection();
+    createReconTaskStatus(conn);
+  }
+
+  /**
+   * Create the Recon Task Status table.
+   * @param conn connection
+   */
+  private void createReconTaskStatus(Connection conn) {
+    DSL.using(conn).createTableIfNotExists(RECON_TASK_STATUS_TABLE_NAME)
+        .column("task_name", SQLDataType.VARCHAR(1024))
+        .column("last_updated_timestamp", SQLDataType.BIGINT)
+        .column("last_updated_seq_number", SQLDataType.BIGINT)
+        .constraint(DSL.constraint("pk_task_name")
+            .primaryKey("task_name"))
+        .execute();
+  }
+}

+ 5 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java

@@ -39,6 +39,8 @@ import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
 import org.apache.hadoop.utils.db.DBStore;
 
 import com.google.inject.AbstractModule;
@@ -65,6 +67,9 @@ public class ReconControllerModule extends AbstractModule {
     // Persistence - inject configuration provider
     install(new JooqPersistenceModule(
         getProvider(DataSourceConfiguration.class)));
+
+    bind(ReconTaskController.class)
+        .to(ReconTaskControllerImpl.class).in(Singleton.class);
   }
 
   @Provides

+ 18 - 7
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java

@@ -23,6 +23,7 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPS
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
 
+import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -67,7 +68,7 @@ public class ReconServer extends GenericCli {
           @Override
           protected void configureServlets() {
             rest("/api/*")
-                .packages("org.apache.hadoop.ozone.recon.api");
+              .packages("org.apache.hadoop.ozone.recon.api");
           }
         });
 
@@ -100,10 +101,6 @@ public class ReconServer extends GenericCli {
     OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
         .getInstance(OzoneManagerServiceProvider.class);
 
-    // Schedule the task to read OM DB and write the reverse mapping to Recon
-    // container DB.
-    ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
-        ozoneManagerServiceProvider, containerDBServiceProvider);
     long initialDelay = configuration.getTimeDuration(
         RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
         RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
@@ -113,8 +110,22 @@ public class ReconServer extends GenericCli {
         RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
 
-    scheduler.scheduleWithFixedDelay(containerKeyMapperTask, initialDelay,
-        interval, TimeUnit.MILLISECONDS);
+
+    scheduler.scheduleWithFixedDelay(() -> {
+      try {
+        ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
+        // Schedule the task to read OM DB and write the reverse mapping to
+        // Recon container DB.
+        ContainerKeyMapperTask containerKeyMapperTask =
+            new ContainerKeyMapperTask(containerDBServiceProvider,
+                ozoneManagerServiceProvider.getOMMetadataManagerInstance());
+        containerKeyMapperTask.reprocess(
+            ozoneManagerServiceProvider.getOMMetadataManagerInstance());
+      } catch (IOException e) {
+        LOG.error("Unable to get OM " +
+            "Snapshot", e);
+      }
+    }, initialDelay, interval, TimeUnit.MILLISECONDS);
   }
 
   void stop() throws Exception {

+ 4 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java

@@ -112,6 +112,10 @@ public final class ReconServerConfigKeys {
   public static final String OZONE_RECON_SQL_MAX_IDLE_CONNECTION_TEST_STMT =
       "ozone.recon.sql.db.conn.idle.test";
 
+  public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
+      "ozone.recon.task.thread.count";
+  public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 1;
+
   /**
    * Private constructor for utility class.
    */

+ 0 - 1
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java

@@ -64,7 +64,6 @@ public class ReconOmMetadataManagerImpl extends OmMetadataManagerImpl
     try {
       DBStoreBuilder dbStoreBuilder =
           DBStoreBuilder.newBuilder(ozoneConfiguration)
-          .setReadOnly(true)
           .setName(dbFile.getName())
           .setPath(dbFile.toPath().getParent());
       addOMTablesAndCodecs(dbStoreBuilder);

+ 16 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java

@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
 import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
+import org.apache.hadoop.utils.db.TableIterator;
 
 /**
  * The Recon Container DB Service interface.
@@ -75,4 +76,19 @@ public interface ContainerDBServiceProvider {
    * @throws IOException
    */
   Map<Long, ContainerMetadata> getContainers() throws IOException;
+
+  /**
+   * Delete an entry in the container DB.
+   * @param containerKeyPrefix container key prefix to be deleted.
+   * @throws IOException exception.
+   */
+  void deleteContainerMapping(ContainerKeyPrefix containerKeyPrefix)
+      throws IOException;
+
+  /**
+   * Get iterator to the entire container DB.
+   * @return TableIterator
+   * @throws IOException exception
+   */
+  TableIterator getContainerTableIterator() throws IOException;
 }

+ 11 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java

@@ -191,4 +191,15 @@ public class ContainerDBServiceProviderImpl
     }
     return containers;
   }
+
+  @Override
+  public void deleteContainerMapping(ContainerKeyPrefix containerKeyPrefix)
+      throws IOException {
+    containerKeyTable.delete(containerKeyPrefix);
+  }
+
+  @Override
+  public TableIterator getContainerTableIterator() throws IOException {
+    return containerKeyTable.iterator();
+  }
 }

+ 116 - 38
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java

@@ -21,14 +21,20 @@ package org.apache.hadoop.ozone.recon.tasks;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
 import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
-import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
 import org.apache.hadoop.utils.db.Table;
 import org.apache.hadoop.utils.db.TableIterator;
 import org.slf4j.Logger;
@@ -38,19 +44,24 @@ import org.slf4j.LoggerFactory;
  * Class to iterate over the OM DB and populate the Recon container DB with
  * the container -> Key reverse mapping.
  */
-public class ContainerKeyMapperTask implements Runnable {
+public class ContainerKeyMapperTask extends ReconDBUpdateTask {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerKeyMapperTask.class);
 
-  private OzoneManagerServiceProvider ozoneManagerServiceProvider;
   private ContainerDBServiceProvider containerDBServiceProvider;
+  private Collection<String> tables = new ArrayList<>();
 
-  public ContainerKeyMapperTask(
-      OzoneManagerServiceProvider ozoneManagerServiceProvider,
-      ContainerDBServiceProvider containerDBServiceProvider) {
-    this.ozoneManagerServiceProvider = ozoneManagerServiceProvider;
+  public ContainerKeyMapperTask(ContainerDBServiceProvider
+                                    containerDBServiceProvider,
+                                OMMetadataManager omMetadataManager) {
+    super("ContainerKeyMapperTask");
     this.containerDBServiceProvider = containerDBServiceProvider;
+    try {
+      tables.add(omMetadataManager.getKeyTable().getName());
+    } catch (IOException ioEx) {
+      LOG.error("Unable to listen on Key Table updates ", ioEx);
+    }
   }
 
   /**
@@ -58,55 +69,122 @@ public class ContainerKeyMapperTask implements Runnable {
    * (container, key) -> count to Recon Container DB.
    */
   @Override
-  public void run() {
+  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
     int omKeyCount = 0;
-    int containerCount = 0;
     try {
-      LOG.info("Starting a run of ContainerKeyMapperTask.");
+      LOG.info("Starting a 'reprocess' run of ContainerKeyMapperTask.");
       Instant start = Instant.now();
 
-      //Update OM DB Snapshot.
-      ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
-
-      OMMetadataManager omMetadataManager = ozoneManagerServiceProvider
-          .getOMMetadataManagerInstance();
       Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
                keyIter = omKeyInfoTable.iterator()) {
         while (keyIter.hasNext()) {
           Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
-          StringBuilder key = new StringBuilder(kv.getKey());
           OmKeyInfo omKeyInfo = kv.getValue();
-          for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo
-              .getKeyLocationVersions()) {
-            long keyVersion = omKeyLocationInfoGroup.getVersion();
-            for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup
-                .getLocationList()) {
-              long containerId = omKeyLocationInfo.getContainerID();
-              ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(
-                  containerId, key.toString(), keyVersion);
-              if (containerDBServiceProvider.getCountForForContainerKeyPrefix(
-                  containerKeyPrefix) == 0) {
-                // Save on writes. No need to save same container-key prefix
-                // mapping again.
-                containerDBServiceProvider.storeContainerKeyMapping(
-                    containerKeyPrefix, 1);
-              }
-              containerCount++;
-            }
-          }
+          writeOMKeyToContainerDB(kv.getKey(), omKeyInfo);
           omKeyCount++;
         }
       }
-      LOG.info("Completed the run of ContainerKeyMapperTask.");
+      LOG.info("Completed 'reprocess' of ContainerKeyMapperTask.");
       Instant end = Instant.now();
       long duration = Duration.between(start, end).toMillis();
-      LOG.info("It took me " + (double)duration / 1000.0 + " seconds to " +
-          "process " + omKeyCount + " keys and " + containerCount + " " +
-          "containers.");
+      LOG.info("It took me " + (double) duration / 1000.0 + " seconds to " +
+          "process " + omKeyCount + " keys.");
     } catch (IOException ioEx) {
       LOG.error("Unable to populate Container Key Prefix data in Recon DB. ",
           ioEx);
+      return new ImmutablePair<>(getTaskName(), false);
+    }
+    return new ImmutablePair<>(getTaskName(), true);
+  }
+
+
+  @Override
+  protected Collection<String> getTaskTables() {
+    return tables;
+  }
+
+  @Override
+  Pair<String, Boolean> process(OMUpdateEventBatch events) {
+    Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    while (eventIterator.hasNext()) {
+      OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
+      String updatedKey = omdbUpdateEvent.getKey();
+      OmKeyInfo updatedKeyValue = omdbUpdateEvent.getValue();
+      try {
+        switch (omdbUpdateEvent.getAction()) {
+        case PUT:
+          writeOMKeyToContainerDB(updatedKey, updatedKeyValue);
+          break;
+
+        case DELETE:
+          deleteOMKeyFromContainerDB(updatedKey);
+          break;
+
+        default: LOG.debug("Skipping DB update event : " + omdbUpdateEvent
+            .getAction());
+        }
+      } catch (IOException e) {
+        LOG.error("Unexpected exception while updating key data : {} ", e);
+        return new ImmutablePair<>(getTaskName(), false);
+      }
     }
+    return new ImmutablePair<>(getTaskName(), true);
   }
+
+  /**
+   * Delete an OM Key from Container DB.
+   * @param key key String.
+   * @throws IOException If Unable to write to container DB.
+   */
+  private void  deleteOMKeyFromContainerDB(String key)
+      throws IOException {
+
+    TableIterator<ContainerKeyPrefix, ? extends
+        Table.KeyValue<ContainerKeyPrefix, Integer>> containerIterator =
+        containerDBServiceProvider.getContainerTableIterator();
+
+    Set<ContainerKeyPrefix> keysToDeDeleted = new HashSet<>();
+
+    while (containerIterator.hasNext()) {
+      Table.KeyValue<ContainerKeyPrefix, Integer> keyValue =
+          containerIterator.next();
+      String keyPrefix = keyValue.getKey().getKeyPrefix();
+      if (keyPrefix.equals(key)) {
+        keysToDeDeleted.add(keyValue.getKey());
+      }
+    }
+
+    for (ContainerKeyPrefix containerKeyPrefix : keysToDeDeleted) {
+      containerDBServiceProvider.deleteContainerMapping(containerKeyPrefix);
+    }
+  }
+
+  /**
+   * Write an OM key to container DB.
+   * @param key key String
+   * @param omKeyInfo omKeyInfo value
+   * @throws IOException if unable to write to recon DB.
+   */
+  private void  writeOMKeyToContainerDB(String key, OmKeyInfo omKeyInfo)
+      throws IOException {
+    for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo
+        .getKeyLocationVersions()) {
+      long keyVersion = omKeyLocationInfoGroup.getVersion();
+      for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup
+          .getLocationList()) {
+        long containerId = omKeyLocationInfo.getContainerID();
+        ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(
+            containerId, key, keyVersion);
+        if (containerDBServiceProvider.getCountForForContainerKeyPrefix(
+            containerKeyPrefix) == 0) {
+          // Save on writes. No need to save same container-key prefix
+          // mapping again.
+          containerDBServiceProvider.storeContainerKeyMapping(
+              containerKeyPrefix, 1);
+        }
+      }
+    }
+  }
+
 }

+ 150 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java

@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+/**
+ * A class used to encapsulate a single OM DB update event.
+ * Currently only PUT and DELETE are supported.
+ * @param <KEY> Type of Key.
+ * @param <VALUE> Type of Value.
+ */
+public final class OMDBUpdateEvent<KEY, VALUE> {
+
+  private final OMDBUpdateAction action;
+  private final String table;
+  private final KEY updatedKey;
+  private final VALUE updatedValue;
+  private final EventInfo eventInfo;
+
+  private OMDBUpdateEvent(OMDBUpdateAction action,
+                          String table,
+                          KEY updatedKey,
+                          VALUE updatedValue,
+                          EventInfo eventInfo) {
+    this.action = action;
+    this.table = table;
+    this.updatedKey = updatedKey;
+    this.updatedValue = updatedValue;
+    this.eventInfo = eventInfo;
+  }
+
+  public OMDBUpdateAction getAction() {
+    return action;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public KEY getKey() {
+    return updatedKey;
+  }
+
+  public VALUE getValue() {
+    return updatedValue;
+  }
+
+  public EventInfo getEventInfo() {
+    return eventInfo;
+  }
+
+  /**
+   * Builder used to construct an OM DB Update event.
+   * @param <KEY> Key type.
+   * @param <VALUE> Value type.
+   */
+  public static class OMUpdateEventBuilder<KEY, VALUE> {
+
+    private OMDBUpdateAction action;
+    private String table;
+    private KEY updatedKey;
+    private VALUE updatedValue;
+    private EventInfo eventInfo;
+
+    OMUpdateEventBuilder setAction(OMDBUpdateAction omdbUpdateAction) {
+      this.action = omdbUpdateAction;
+      return this;
+    }
+
+    OMUpdateEventBuilder setTable(String tableName) {
+      this.table = tableName;
+      return this;
+    }
+
+    OMUpdateEventBuilder setKey(KEY key) {
+      this.updatedKey = key;
+      return this;
+    }
+
+    OMUpdateEventBuilder setValue(VALUE value) {
+      this.updatedValue = value;
+      return this;
+    }
+
+    OMUpdateEventBuilder setEventInfo(long sequenceNumber,
+                                      long eventTimestampMillis) {
+      this.eventInfo = new EventInfo(sequenceNumber,
+          eventTimestampMillis);
+      return this;
+    }
+
+    /**
+     * Build an OM update event.
+     * @return OMDBUpdateEvent
+     */
+    public OMDBUpdateEvent build() {
+      return new OMDBUpdateEvent<KEY, VALUE>(
+          action,
+          table,
+          updatedKey,
+          updatedValue,
+          eventInfo);
+    }
+  }
+
+  /**
+   * Class used to hold timing information for an event. (Seq number and
+   * timestamp)
+   */
+  public static class EventInfo {
+    private long sequenceNumber;
+    private long eventTimestampMillis;
+
+    public EventInfo(long sequenceNumber,
+                     long eventTimestampMillis) {
+      this.sequenceNumber = sequenceNumber;
+      this.eventTimestampMillis = eventTimestampMillis;
+    }
+
+    public long getSequenceNumber() {
+      return sequenceNumber;
+    }
+
+    public long getEventTimestampMillis() {
+      return eventTimestampMillis;
+    }
+  }
+
+  /**
+   * Supported Actions - PUT, DELETE.
+   */
+  public enum OMDBUpdateAction {
+    PUT, DELETE
+  }
+}

+ 220 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java

@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.utils.db.CodecRegistry;
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class used to listen on OM RocksDB updates.
+ */
+public class OMDBUpdatesHandler extends WriteBatch.Handler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMDBUpdatesHandler.class);
+
+  private Map<Integer, String> tablesNames;
+  private CodecRegistry codecRegistry;
+  private List<OMDBUpdateEvent> omdbUpdateEvents = new ArrayList<>();
+
+  public OMDBUpdatesHandler(OMMetadataManager omMetadataManager) {
+    tablesNames = omMetadataManager.getStore().getTableNames();
+    codecRegistry = omMetadataManager.getStore().getCodecRegistry();
+  }
+
+  @Override
+  public void put(int cfIndex, byte[] keyBytes, byte[] valueBytes) throws
+      RocksDBException {
+    try {
+      processEvent(cfIndex, keyBytes, valueBytes,
+          OMDBUpdateEvent.OMDBUpdateAction.PUT);
+    } catch (IOException ioEx) {
+      LOG.error("Exception when reading key : " + ioEx);
+    }
+  }
+
+  @Override
+  public void delete(int cfIndex, byte[] keyBytes) throws RocksDBException {
+    try {
+      processEvent(cfIndex, keyBytes, null,
+          OMDBUpdateEvent.OMDBUpdateAction.DELETE);
+    } catch (IOException ioEx) {
+      LOG.error("Exception when reading key : " + ioEx);
+    }
+  }
+
+  /**
+   *
+   */
+  private void processEvent(int cfIndex, byte[] keyBytes, byte[]
+      valueBytes, OMDBUpdateEvent.OMDBUpdateAction action)
+      throws IOException {
+    String tableName = tablesNames.get(cfIndex);
+    Class keyType = getKeyType(tableName);
+    Class valueType = getValueType(tableName);
+    if (valueType != null) {
+      OMDBUpdateEvent.OMUpdateEventBuilder builder =
+          new OMDBUpdateEvent.OMUpdateEventBuilder<>();
+      builder.setTable(tableName);
+
+      Object key = codecRegistry.asObject(keyBytes, keyType);
+      builder.setKey(key);
+
+      if (!action.equals(OMDBUpdateEvent.OMDBUpdateAction.DELETE)) {
+        Object value = codecRegistry.asObject(valueBytes, valueType);
+        builder.setValue(value);
+      }
+
+      builder.setAction(action);
+      OMDBUpdateEvent event = builder.build();
+      LOG.info("Generated OM update Event for table : " + event.getTable()
+          + ", Key = " + event.getKey());
+      // Temporarily adding to an event buffer for testing. In subsequent JIRAs,
+      // a Recon side class will be implemented that requests delta updates
+      // from OM and calls on this handler. In that case, we will fill up
+      // this buffer and pass it on to the ReconTaskController which has
+      // tasks waiting on OM events.
+      omdbUpdateEvents.add(event);
+    }
+  }
+
+  // There are no use cases yet for the remaining methods in Recon. These
+  // will be implemented as and when need arises.
+
+  @Override
+  public void put(byte[] bytes, byte[] bytes1) {
+
+  }
+
+  @Override
+  public void merge(int i, byte[] bytes, byte[] bytes1)
+      throws RocksDBException {
+  }
+
+  @Override
+  public void merge(byte[] bytes, byte[] bytes1) {
+  }
+
+  @Override
+  public void delete(byte[] bytes) {
+  }
+
+  @Override
+  public void singleDelete(int i, byte[] bytes) throws RocksDBException {
+  }
+
+  @Override
+  public void singleDelete(byte[] bytes) {
+  }
+
+  @Override
+  public void deleteRange(int i, byte[] bytes, byte[] bytes1)
+      throws RocksDBException {
+  }
+
+  @Override
+  public void deleteRange(byte[] bytes, byte[] bytes1) {
+
+  }
+
+  @Override
+  public void logData(byte[] bytes) {
+
+  }
+
+  @Override
+  public void putBlobIndex(int i, byte[] bytes, byte[] bytes1)
+      throws RocksDBException {
+  }
+
+  @Override
+  public void markBeginPrepare() throws RocksDBException {
+
+  }
+
+  @Override
+  public void markEndPrepare(byte[] bytes) throws RocksDBException {
+
+  }
+
+  @Override
+  public void markNoop(boolean b) throws RocksDBException {
+
+  }
+
+  @Override
+  public void markRollback(byte[] bytes) throws RocksDBException {
+
+  }
+
+  @Override
+  public void markCommit(byte[] bytes) throws RocksDBException {
+
+  }
+
+  /**
+   * Return Key type class for a given table name.
+   * @param name table name.
+   * @return String.class by default.
+   */
+  private Class getKeyType(String name) {
+    return String.class;
+  }
+
+  /**
+   * Return Value type class for a given table.
+   * @param name table name
+   * @return Value type based on table name.
+   */
+  @VisibleForTesting
+  protected Class getValueType(String name) {
+    switch (name) {
+    case KEY_TABLE : return OmKeyInfo.class;
+    case VOLUME_TABLE : return OmVolumeArgs.class;
+    case BUCKET_TABLE : return OmBucketInfo.class;
+    default: return null;
+    }
+  }
+
+  /**
+   * Get List of events. (Temporary API to unit test the class).
+   * @return List of events.
+   */
+  public List<OMDBUpdateEvent> getEvents() {
+    return omdbUpdateEvents;
+  }
+
+}

+ 69 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Wrapper class to hold multiple OM DB update events.
+ */
+public class OMUpdateEventBatch {
+
+  private List<OMDBUpdateEvent> events;
+
+  OMUpdateEventBatch(Collection<OMDBUpdateEvent> e) {
+    events = new ArrayList<>(e);
+  }
+
+  /**
+   * Get Sequence Number and timestamp of last event in this batch.
+   * @return Event Info instance.
+   */
+  OMDBUpdateEvent.EventInfo getLastEventInfo() {
+    if (events.isEmpty()) {
+      return new OMDBUpdateEvent.EventInfo(-1, -1);
+    } else {
+      return events.get(events.size() - 1).getEventInfo();
+    }
+  }
+
+  /**
+   * Return iterator to Event batch.
+   * @return iterator
+   */
+  public Iterator<OMDBUpdateEvent> getIterator() {
+    return events.iterator();
+  }
+
+  /**
+   * Filter events based on Tables.
+   * @param tables set of tables to filter on.
+   * @return trimmed event batch.
+   */
+  public OMUpdateEventBatch filter(Collection<String> tables) {
+    return new OMUpdateEventBatch(events
+        .stream()
+        .filter(e -> tables.contains(e.getTable()))
+        .collect(Collectors.toList()));
+  }
+}

+ 66 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import java.util.Collection;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+
+/**
+ * Abstract class used to denote a Recon task that needs to act on OM DB events.
+ */
+public abstract class ReconDBUpdateTask {
+
+  private String taskName;
+
+  protected ReconDBUpdateTask(String taskName) {
+    this.taskName = taskName;
+  }
+
+  /**
+   * Return task name.
+   * @return task name
+   */
+  public String getTaskName() {
+    return taskName;
+  }
+
+  /**
+   * Return the list of tables that the task is listening on.
+   * Empty list means the task is NOT listening on any tables.
+   * @return Collection of Tables.
+   */
+  protected abstract Collection<String> getTaskTables();
+
+  /**
+   * Process a set of OM events on tables that the task is listening on.
+   * @param events Set of events to be processed by the task.
+   * @return Pair of task name -> task success.
+   */
+  abstract Pair<String, Boolean> process(OMUpdateEventBatch events);
+
+  /**
+   * Process a  on tables that the task is listening on.
+   * @param omMetadataManager OM Metadata manager instance.
+   * @return Pair of task name -> task success.
+   */
+  abstract Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
+
+}

+ 46 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import java.util.Map;
+
+/**
+ * Controller used by Recon to manage Tasks that are waiting on Recon events.
+ */
+public interface ReconTaskController {
+
+  /**
+   * Register API used by tasks to register themselves.
+   * @param task task instance
+   */
+  void registerTask(ReconDBUpdateTask task);
+
+  /**
+   * Pass on a set of OM DB update events to the registered tasks.
+   * @param events set of events
+   * @throws InterruptedException InterruptedException
+   */
+  void consumeOMEvents(OMUpdateEventBatch events) throws InterruptedException;
+
+  /**
+   * Get set of registered tasks.
+   * @return Map of Task name -> Task.
+   */
+  Map<String, ReconDBUpdateTask> getRegisteredTasks();
+}

+ 198 - 0
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.jooq.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+/**
+ * Implementation of ReconTaskController.
+ */
+public class ReconTaskControllerImpl implements ReconTaskController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReconTaskControllerImpl.class);
+
+  private Map<String, ReconDBUpdateTask> reconDBUpdateTasks;
+  private ExecutorService executorService;
+  private int threadCount = 1;
+  private final Semaphore taskSemaphore = new Semaphore(1);
+  private final ReconOMMetadataManager omMetadataManager;
+  private Map<String, AtomicInteger> taskFailureCounter = new HashMap<>();
+  private static final int TASK_FAILURE_THRESHOLD = 2;
+  private ReconTaskStatusDao reconTaskStatusDao;
+
+  @Inject
+  public ReconTaskControllerImpl(OzoneConfiguration configuration,
+                                 ReconOMMetadataManager omMetadataManager,
+                                 Configuration sqlConfiguration) {
+    this.omMetadataManager = omMetadataManager;
+    reconDBUpdateTasks = new HashMap<>();
+    threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
+        OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
+    executorService = Executors.newFixedThreadPool(threadCount);
+    reconTaskStatusDao = new ReconTaskStatusDao(sqlConfiguration);
+  }
+
+  @Override
+  public void registerTask(ReconDBUpdateTask task) {
+    String taskName = task.getTaskName();
+    LOG.info("Registered task " + taskName + " with controller.");
+
+    // Store task in Task Map.
+    reconDBUpdateTasks.put(taskName, task);
+    // Store Task in Task failure tracker.
+    taskFailureCounter.put(taskName, new AtomicInteger(0));
+    // Create DB record for the task.
+    ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
+        0L, 0L);
+    reconTaskStatusDao.insert(reconTaskStatusRecord);
+  }
+
+  /**
+   * For every registered task, we try process step twice and then reprocess
+   * once (if process failed twice) to absorb the events. If a task has failed
+   * reprocess call more than 2 times across events, it is unregistered
+   * (blacklisted).
+   * @param events set of events
+   * @throws InterruptedException
+   */
+  @Override
+  public void consumeOMEvents(OMUpdateEventBatch events)
+      throws InterruptedException {
+    taskSemaphore.acquire();
+
+    try {
+      Collection<Callable<Pair>> tasks = new ArrayList<>();
+      for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
+          reconDBUpdateTasks.entrySet()) {
+        ReconDBUpdateTask task = taskEntry.getValue();
+        tasks.add(() -> task.process(events));
+      }
+
+      List<Future<Pair>> results = executorService.invokeAll(tasks);
+      List<String> failedTasks = processTaskResults(results, events);
+
+      //Retry
+      List<String> retryFailedTasks = new ArrayList<>();
+      if (!failedTasks.isEmpty()) {
+        tasks.clear();
+        for (String taskName : failedTasks) {
+          ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
+          tasks.add(() -> task.process(events));
+        }
+        results = executorService.invokeAll(tasks);
+        retryFailedTasks = processTaskResults(results, events);
+      }
+
+      //Reprocess
+      //TODO Move to a separate task queue since reprocess may be a heavy
+      //operation for large OM DB instances
+      if (!retryFailedTasks.isEmpty()) {
+        tasks.clear();
+        for (String taskName : failedTasks) {
+          ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
+          tasks.add(() -> task.reprocess(omMetadataManager));
+        }
+        results = executorService.invokeAll(tasks);
+        List<String> reprocessFailedTasks = processTaskResults(results, events);
+        for (String taskName : reprocessFailedTasks) {
+          LOG.info("Reprocess step failed for task : " + taskName);
+          if (taskFailureCounter.get(taskName).incrementAndGet() >
+              TASK_FAILURE_THRESHOLD) {
+            LOG.info("Blacklisting Task since it failed retry and " +
+                "reprocess more than " + TASK_FAILURE_THRESHOLD + " times.");
+            reconDBUpdateTasks.remove(taskName);
+          }
+        }
+      }
+    } catch (ExecutionException e) {
+      LOG.error("Unexpected error : ", e);
+    } finally {
+      taskSemaphore.release();
+    }
+  }
+
+  /**
+   * Store the last completed event sequence number and timestamp to the DB
+   * for that task.
+   * @param taskName taskname to be updated.
+   * @param eventInfo contains the new sequence number and timestamp.
+   */
+  private void storeLastCompletedTransaction(
+      String taskName, OMDBUpdateEvent.EventInfo eventInfo) {
+    ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
+        eventInfo.getEventTimestampMillis(), eventInfo.getSequenceNumber());
+    reconTaskStatusDao.update(reconTaskStatusRecord);
+  }
+
+  @Override
+  public Map<String, ReconDBUpdateTask> getRegisteredTasks() {
+    return reconDBUpdateTasks;
+  }
+
+  /**
+   * Wait on results of all tasks.
+   * @param results Set of Futures.
+   * @param events Events.
+   * @return List of failed task names
+   * @throws ExecutionException execution Exception
+   * @throws InterruptedException Interrupted Exception
+   */
+  private List<String> processTaskResults(List<Future<Pair>> results,
+                                          OMUpdateEventBatch events)
+      throws ExecutionException, InterruptedException {
+    List<String> failedTasks = new ArrayList<>();
+    for (Future<Pair> f : results) {
+      String taskName = f.get().getLeft().toString();
+      if (!(Boolean)f.get().getRight()) {
+        LOG.info("Failed task : " + taskName);
+        failedTasks.add(f.get().getLeft().toString());
+      } else {
+        taskFailureCounter.get(taskName).set(0);
+        storeLastCompletedTransaction(taskName, events.getLastEventInfo());
+      }
+    }
+    return failedTasks;
+  }
+}

+ 5 - 2
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java

@@ -190,8 +190,11 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
 
     //Generate Recon container DB data.
     ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
-        ozoneManagerServiceProvider, containerDbServiceProvider);
-    containerKeyMapperTask.run();
+        containerDbServiceProvider,
+        ozoneManagerServiceProvider.getOMMetadataManagerInstance());
+    ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
+    containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
+        .getOMMetadataManagerInstance());
   }
 
   @Test

+ 4 - 1
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/AbstractSqlDatabaseTest.java

@@ -80,7 +80,10 @@ public abstract class AbstractSqlDatabaseTest {
     return dslContext;
   }
 
-  static class DataSourceConfigurationProvider implements
+  /**
+   * Local Sqlite datasource provider.
+   */
+  public static class DataSourceConfigurationProvider implements
       Provider<DataSourceConfiguration> {
 
     @Override

+ 143 - 0
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestReconInternalSchemaDefinition.java

@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.persistence;
+
+import static org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.jooq.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Class used to test ReconInternalSchemaDefinition.
+ */
+public class TestReconInternalSchemaDefinition extends AbstractSqlDatabaseTest {
+
+  @Test
+  public void testSchemaCreated() throws Exception {
+    ReconInternalSchemaDefinition schemaDefinition = getInjector().getInstance(
+        ReconInternalSchemaDefinition.class);
+
+    schemaDefinition.initializeSchema();
+
+    Connection connection =
+        getInjector().getInstance(DataSource.class).getConnection();
+    // Verify table definition
+    DatabaseMetaData metaData = connection.getMetaData();
+    ResultSet resultSet = metaData.getColumns(null, null,
+        RECON_TASK_STATUS_TABLE_NAME, null);
+
+    List<Pair<String, Integer>> expectedPairs = new ArrayList<>();
+
+    expectedPairs.add(new ImmutablePair<>("task_name", Types.VARCHAR));
+    expectedPairs.add(new ImmutablePair<>("last_updated_timestamp",
+        Types.INTEGER));
+    expectedPairs.add(new ImmutablePair<>("last_updated_seq_number",
+        Types.INTEGER));
+
+    List<Pair<String, Integer>> actualPairs = new ArrayList<>();
+
+    while (resultSet.next()) {
+      actualPairs.add(new ImmutablePair<>(
+          resultSet.getString("COLUMN_NAME"),
+          resultSet.getInt("DATA_TYPE")));
+    }
+
+    Assert.assertEquals(3, actualPairs.size());
+    Assert.assertEquals(expectedPairs, actualPairs);
+  }
+
+  @Test
+  public void testReconTaskStatusCRUDOperations() throws Exception {
+    // Verify table exists
+    ReconInternalSchemaDefinition schemaDefinition = getInjector().getInstance(
+        ReconInternalSchemaDefinition.class);
+
+    schemaDefinition.initializeSchema();
+
+    DataSource ds = getInjector().getInstance(DataSource.class);
+    Connection connection = ds.getConnection();
+
+    DatabaseMetaData metaData = connection.getMetaData();
+    ResultSet resultSet = metaData.getTables(null, null,
+        RECON_TASK_STATUS_TABLE_NAME, null);
+
+    while (resultSet.next()) {
+      Assert.assertEquals(RECON_TASK_STATUS_TABLE_NAME,
+          resultSet.getString("TABLE_NAME"));
+    }
+
+    ReconTaskStatusDao dao = new ReconTaskStatusDao(getInjector().getInstance(
+        Configuration.class));
+
+    long now = System.currentTimeMillis();
+    ReconTaskStatus newRecord = new ReconTaskStatus();
+    newRecord.setTaskName("HelloWorldTask");
+    newRecord.setLastUpdatedTimestamp(now);
+    newRecord.setLastUpdatedSeqNumber(100L);
+
+    // Create
+    dao.insert(newRecord);
+
+    ReconTaskStatus newRecord2 = new ReconTaskStatus();
+    newRecord2.setTaskName("GoodbyeWorldTask");
+    newRecord2.setLastUpdatedTimestamp(now);
+    newRecord2.setLastUpdatedSeqNumber(200L);
+    // Create
+    dao.insert(newRecord2);
+
+    // Read
+    ReconTaskStatus dbRecord = dao.findById("HelloWorldTask");
+
+    Assert.assertEquals("HelloWorldTask", dbRecord.getTaskName());
+    Assert.assertEquals(Long.valueOf(now), dbRecord.getLastUpdatedTimestamp());
+    Assert.assertEquals(Long.valueOf(100), dbRecord.getLastUpdatedSeqNumber());
+
+    // Update
+    dbRecord.setLastUpdatedSeqNumber(150L);
+    dao.update(dbRecord);
+
+    // Read updated
+    dbRecord = dao.findById("HelloWorldTask");
+    Assert.assertEquals(Long.valueOf(150), dbRecord.getLastUpdatedSeqNumber());
+
+    // Delete
+    dao.deleteById("GoodbyeWorldTask");
+
+    // Verify
+    dbRecord = dao.findById("GoodbyeWorldTask");
+
+    Assert.assertNull(dbRecord);
+  }
+
+}

+ 0 - 17
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java

@@ -22,7 +22,6 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
 
 import java.io.File;
-import java.io.IOException;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -127,22 +126,6 @@ public class TestReconOmMetadataManagerImpl {
         .get("/sampleVol/bucketOne/key_one"));
     Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
         .get("/sampleVol/bucketOne/key_two"));
-
-    //Verify that we cannot write data to Recon OM DB (Read Only)
-    try {
-      reconOMMetadataManager.getKeyTable().put(
-          "/sampleVol/bucketOne/fail_key", new OmKeyInfo.Builder()
-              .setBucketName("bucketOne")
-              .setVolumeName("sampleVol")
-              .setKeyName("fail_key")
-              .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
-              .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
-              .build());
-      Assert.fail();
-    } catch (IOException e) {
-      Assert.assertTrue(e.getMessage()
-          .contains("Not supported operation in read only mode"));
-    }
   }
 
 }

+ 25 - 0
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java

@@ -203,4 +203,29 @@ public class TestContainerDBServiceProviderImpl {
     assertTrue(keyPrefixMap.size() == 1);
     assertTrue(keyPrefixMap.get(containerKeyPrefix3) == 3);
   }
+
+  @Test
+  public void testDeleteContainerMapping() throws IOException {
+    long containerId = System.currentTimeMillis();
+
+    ContainerKeyPrefix containerKeyPrefix1 = new
+        ContainerKeyPrefix(containerId, "V3/B1/K1", 0);
+    containerDbServiceProvider.storeContainerKeyMapping(containerKeyPrefix1,
+        1);
+
+    ContainerKeyPrefix containerKeyPrefix2 = new ContainerKeyPrefix(
+        containerId, "V3/B1/K2", 0);
+    containerDbServiceProvider.storeContainerKeyMapping(containerKeyPrefix2,
+        2);
+
+    Map<ContainerKeyPrefix, Integer> keyPrefixMap =
+        containerDbServiceProvider.getKeyPrefixesForContainer(containerId);
+    assertTrue(keyPrefixMap.size() == 2);
+
+    containerDbServiceProvider.deleteContainerMapping(new ContainerKeyPrefix(
+        containerId, "V3/B1/K2", 0));
+    keyPrefixMap =
+        containerDbServiceProvider.getKeyPrefixesForContainer(containerId);
+    assertTrue(keyPrefixMap.size() == 1);
+  }
 }

+ 77 - 0
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java

@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+
+/**
+ * Dummy Recon task that has 3 modes of operations.
+ * ALWAYS_FAIL / FAIL_ONCE / ALWAYS_PASS
+ */
+public class DummyReconDBTask extends ReconDBUpdateTask {
+
+  private int numFailuresAllowed = Integer.MIN_VALUE;
+  private int callCtr = 0;
+
+  public DummyReconDBTask(String taskName, TaskType taskType) {
+    super(taskName);
+    if (taskType.equals(TaskType.FAIL_ONCE)) {
+      numFailuresAllowed = 1;
+    } else if (taskType.equals(TaskType.ALWAYS_FAIL)) {
+      numFailuresAllowed = Integer.MAX_VALUE;
+    }
+  }
+
+  @Override
+  protected Collection<String> getTaskTables() {
+    return Collections.singletonList("volumeTable");
+  }
+
+  @Override
+  Pair<String, Boolean> process(OMUpdateEventBatch events) {
+    if (++callCtr <= numFailuresAllowed) {
+      return new ImmutablePair<>(getTaskName(), false);
+    } else {
+      return new ImmutablePair<>(getTaskName(), true);
+    }
+  }
+
+  @Override
+  Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+    if (++callCtr <= numFailuresAllowed) {
+      return new ImmutablePair<>(getTaskName(), false);
+    } else {
+      return new ImmutablePair<>(getTaskName(), true);
+    }
+  }
+
+  /**
+   * Type of the task.
+   */
+  public enum TaskType {
+    ALWAYS_PASS,
+    FAIL_ONCE,
+    ALWAYS_FAIL
+  }
+}

+ 133 - 22
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java

@@ -23,10 +23,7 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -34,9 +31,10 @@ import java.util.Map;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
@@ -48,14 +46,11 @@ import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
-import org.apache.hadoop.utils.db.DBCheckpoint;
 import org.apache.hadoop.utils.db.DBStore;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -110,7 +105,7 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
   }
 
   @Test
-  public void testRun() throws Exception{
+  public void testReprocessOMDB() throws Exception{
 
     Map<ContainerKeyPrefix, Integer> keyPrefixesForContainer =
         containerDbServiceProvider.getKeyPrefixesForContainer(1);
@@ -137,25 +132,17 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
     OmKeyLocationInfoGroup omKeyLocationInfoGroup = new
         OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
 
-    writeDataToOm(omMetadataManager,
+    writeDataToOm(reconOMMetadataManager,
         "key_one",
         "bucketOne",
         "sampleVol",
         Collections.singletonList(omKeyLocationInfoGroup));
 
-    //Take snapshot of OM DB and copy over to Recon OM DB.
-    DBCheckpoint checkpoint = omMetadataManager.getStore()
-        .getCheckpoint(true);
-    File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
-    InputStream inputStream = new FileInputStream(tarFile);
-    PowerMockito.stub(PowerMockito.method(ReconUtils.class,
-        "makeHttpCall",
-        CloseableHttpClient.class, String.class))
-        .toReturn(inputStream);
-
-    ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
-        ozoneManagerServiceProvider, containerDbServiceProvider);
-    containerKeyMapperTask.run();
+    ContainerKeyMapperTask containerKeyMapperTask =
+        new ContainerKeyMapperTask(containerDbServiceProvider,
+        ozoneManagerServiceProvider.getOMMetadataManagerInstance());
+    containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
+        .getOMMetadataManagerInstance());
 
     keyPrefixesForContainer =
         containerDbServiceProvider.getKeyPrefixesForContainer(1);
@@ -176,6 +163,130 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
         keyPrefixesForContainer.get(containerKeyPrefix).intValue());
   }
 
+  @Test
+  public void testProcessOMEvents() throws IOException {
+    Map<ContainerKeyPrefix, Integer> keyPrefixesForContainer =
+        containerDbServiceProvider.getKeyPrefixesForContainer(1);
+    assertTrue(keyPrefixesForContainer.isEmpty());
+
+    keyPrefixesForContainer = containerDbServiceProvider
+        .getKeyPrefixesForContainer(2);
+    assertTrue(keyPrefixesForContainer.isEmpty());
+
+    Pipeline pipeline = getRandomPipeline();
+
+    List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
+    BlockID blockID1 = new BlockID(1, 1);
+    OmKeyLocationInfo omKeyLocationInfo1 = getOmKeyLocationInfo(blockID1,
+        pipeline);
+
+    BlockID blockID2 = new BlockID(2, 1);
+    OmKeyLocationInfo omKeyLocationInfo2
+        = getOmKeyLocationInfo(blockID2, pipeline);
+
+    omKeyLocationInfoList.add(omKeyLocationInfo1);
+    omKeyLocationInfoList.add(omKeyLocationInfo2);
+
+    OmKeyLocationInfoGroup omKeyLocationInfoGroup = new
+        OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
+
+    String bucket = "bucketOne";
+    String volume = "sampleVol";
+    String key = "key_one";
+    String omKey = omMetadataManager.getOzoneKey(volume, bucket, key);
+    OmKeyInfo omKeyInfo = buildOmKeyInfo(volume, bucket, key,
+        omKeyLocationInfoGroup);
+
+    OMDBUpdateEvent keyEvent1 = new OMDBUpdateEvent.
+        OMUpdateEventBuilder<String, OmKeyInfo>()
+        .setKey(omKey)
+        .setValue(omKeyInfo)
+        .setTable(omMetadataManager.getKeyTable().getName())
+        .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT)
+        .build();
+
+    BlockID blockID3 = new BlockID(1, 2);
+    OmKeyLocationInfo omKeyLocationInfo3 =
+        getOmKeyLocationInfo(blockID3, pipeline);
+
+    BlockID blockID4 = new BlockID(3, 1);
+    OmKeyLocationInfo omKeyLocationInfo4
+        = getOmKeyLocationInfo(blockID4, pipeline);
+
+    omKeyLocationInfoList = new ArrayList<>();
+    omKeyLocationInfoList.add(omKeyLocationInfo3);
+    omKeyLocationInfoList.add(omKeyLocationInfo4);
+    omKeyLocationInfoGroup = new OmKeyLocationInfoGroup(0,
+        omKeyLocationInfoList);
+
+    String key2 = "key_two";
+    writeDataToOm(reconOMMetadataManager, key2, bucket, volume, Collections
+        .singletonList(omKeyLocationInfoGroup));
+
+    omKey = omMetadataManager.getOzoneKey(volume, bucket, key2);
+    OMDBUpdateEvent keyEvent2 = new OMDBUpdateEvent.
+        OMUpdateEventBuilder<String, OmKeyInfo>()
+        .setKey(omKey)
+        .setAction(OMDBUpdateEvent.OMDBUpdateAction.DELETE)
+        .setTable(omMetadataManager.getKeyTable().getName())
+        .build();
+
+    OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(new
+        ArrayList<OMDBUpdateEvent>() {{
+          add(keyEvent1);
+          add(keyEvent2);
+        }});
+
+    ContainerKeyMapperTask containerKeyMapperTask =
+        new ContainerKeyMapperTask(containerDbServiceProvider,
+            ozoneManagerServiceProvider.getOMMetadataManagerInstance());
+    containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
+        .getOMMetadataManagerInstance());
+
+    keyPrefixesForContainer = containerDbServiceProvider
+        .getKeyPrefixesForContainer(1);
+    assertTrue(keyPrefixesForContainer.size() == 1);
+
+    keyPrefixesForContainer = containerDbServiceProvider
+        .getKeyPrefixesForContainer(2);
+    assertTrue(keyPrefixesForContainer.isEmpty());
+
+    keyPrefixesForContainer = containerDbServiceProvider
+        .getKeyPrefixesForContainer(3);
+    assertTrue(keyPrefixesForContainer.size() == 1);
+
+    // Process PUT & DELETE event.
+    containerKeyMapperTask.process(omUpdateEventBatch);
+
+    keyPrefixesForContainer = containerDbServiceProvider
+        .getKeyPrefixesForContainer(1);
+    assertTrue(keyPrefixesForContainer.size() == 1);
+
+    keyPrefixesForContainer = containerDbServiceProvider
+        .getKeyPrefixesForContainer(2);
+    assertTrue(keyPrefixesForContainer.size() == 1);
+
+    keyPrefixesForContainer = containerDbServiceProvider
+        .getKeyPrefixesForContainer(3);
+    assertTrue(keyPrefixesForContainer.isEmpty());
+
+  }
+
+  private OmKeyInfo buildOmKeyInfo(String volume,
+                                   String bucket,
+                                   String key,
+                                   OmKeyLocationInfoGroup
+                                       omKeyLocationInfoGroup) {
+    return new OmKeyInfo.Builder()
+        .setBucketName(bucket)
+        .setVolumeName(volume)
+        .setKeyName(key)
+        .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+        .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setOmKeyLocationInfos(Collections.singletonList(
+            omKeyLocationInfoGroup))
+        .build();
+  }
   /**
    * Get Test OzoneConfiguration instance.
    * @return OzoneConfiguration

+ 207 - 0
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java

@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.utils.db.RDBStore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.RocksDB;
+import org.rocksdb.TransactionLogIterator;
+import org.rocksdb.WriteBatch;
+
+/**
+ * Class used to test OMDBUpdatesHandler.
+ */
+public class TestOMDBUpdatesHandler {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneConfiguration createNewTestPath() throws IOException {
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      assertTrue(newFolder.mkdirs());
+    }
+    ServerUtils.setOzoneMetaDirPath(configuration, newFolder.toString());
+    return configuration;
+  }
+
+  @Test
+  public void testPut() throws Exception {
+    OzoneConfiguration configuration = createNewTestPath();
+    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration);
+
+    String volumeKey = metaMgr.getVolumeKey("sampleVol");
+    OmVolumeArgs args =
+        OmVolumeArgs.newBuilder()
+            .setVolume("sampleVol")
+            .setAdminName("bilbo")
+            .setOwnerName("bilbo")
+            .build();
+    metaMgr.getVolumeTable().put(volumeKey, args);
+
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+        .setBucketName("bucketOne")
+        .setVolumeName("sampleVol")
+        .setKeyName("key_one")
+        .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+        .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+        .build();
+
+    metaMgr.getKeyTable().put("/sampleVol/bucketOne/key_one", omKeyInfo);
+    RDBStore rdbStore = (RDBStore) metaMgr.getStore();
+
+    RocksDB rocksDB = rdbStore.getDb();
+    TransactionLogIterator transactionLogIterator =
+        rocksDB.getUpdatesSince(0);
+    List<byte[]> writeBatches = new ArrayList<>();
+
+    while(transactionLogIterator.isValid()) {
+      TransactionLogIterator.BatchResult result =
+          transactionLogIterator.getBatch();
+      result.writeBatch().markWalTerminationPoint();
+      WriteBatch writeBatch = result.writeBatch();
+      writeBatches.add(writeBatch.data());
+      transactionLogIterator.next();
+    }
+
+    OzoneConfiguration conf2 = createNewTestPath();
+    OmMetadataManagerImpl reconOmmetaMgr = new OmMetadataManagerImpl(conf2);
+    List<OMDBUpdateEvent> events = new ArrayList<>();
+    for (byte[] data : writeBatches) {
+      WriteBatch writeBatch = new WriteBatch(data);
+      OMDBUpdatesHandler omdbUpdatesHandler =
+          new OMDBUpdatesHandler(reconOmmetaMgr);
+      writeBatch.iterate(omdbUpdatesHandler);
+      events.addAll(omdbUpdatesHandler.getEvents());
+    }
+    assertNotNull(events);
+    assertTrue(events.size() == 2);
+
+    OMDBUpdateEvent volEvent = events.get(0);
+    assertEquals(OMDBUpdateEvent.OMDBUpdateAction.PUT, volEvent.getAction());
+    assertEquals(volumeKey, volEvent.getKey());
+    assertEquals(args.getVolume(), ((OmVolumeArgs)volEvent.getValue())
+        .getVolume());
+
+    OMDBUpdateEvent keyEvent = events.get(1);
+    assertEquals(OMDBUpdateEvent.OMDBUpdateAction.PUT, keyEvent.getAction());
+    assertEquals("/sampleVol/bucketOne/key_one", keyEvent.getKey());
+    assertEquals(omKeyInfo.getBucketName(),
+        ((OmKeyInfo)keyEvent.getValue()).getBucketName());
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    OzoneConfiguration configuration = createNewTestPath();
+    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration);
+
+    String volumeKey = metaMgr.getVolumeKey("sampleVol");
+    OmVolumeArgs args =
+        OmVolumeArgs.newBuilder()
+            .setVolume("sampleVol")
+            .setAdminName("bilbo")
+            .setOwnerName("bilbo")
+            .build();
+    metaMgr.getVolumeTable().put(volumeKey, args);
+
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+        .setBucketName("bucketOne")
+        .setVolumeName("sampleVol")
+        .setKeyName("key_one")
+        .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+        .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+        .build();
+
+    metaMgr.getKeyTable().put("/sampleVol/bucketOne/key_one", omKeyInfo);
+
+    metaMgr.getKeyTable().delete("/sampleVol/bucketOne/key_one");
+    metaMgr.getVolumeTable().delete(volumeKey);
+
+    RDBStore rdbStore = (RDBStore) metaMgr.getStore();
+
+    RocksDB rocksDB = rdbStore.getDb();
+    TransactionLogIterator transactionLogIterator =
+        rocksDB.getUpdatesSince(0);
+    List<byte[]> writeBatches = new ArrayList<>();
+
+    while(transactionLogIterator.isValid()) {
+      TransactionLogIterator.BatchResult result =
+          transactionLogIterator.getBatch();
+      result.writeBatch().markWalTerminationPoint();
+      WriteBatch writeBatch = result.writeBatch();
+      writeBatches.add(writeBatch.data());
+      transactionLogIterator.next();
+    }
+
+    OzoneConfiguration conf2 = createNewTestPath();
+    OmMetadataManagerImpl reconOmmetaMgr = new OmMetadataManagerImpl(conf2);
+    List<OMDBUpdateEvent> events = new ArrayList<>();
+    for (byte[] data : writeBatches) {
+      WriteBatch writeBatch = new WriteBatch(data);
+      OMDBUpdatesHandler omdbUpdatesHandler =
+          new OMDBUpdatesHandler(reconOmmetaMgr);
+      writeBatch.iterate(omdbUpdatesHandler);
+      events.addAll(omdbUpdatesHandler.getEvents());
+    }
+    assertNotNull(events);
+    assertTrue(events.size() == 4);
+
+    OMDBUpdateEvent keyEvent = events.get(2);
+    assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, keyEvent.getAction());
+    assertEquals("/sampleVol/bucketOne/key_one", keyEvent.getKey());
+
+    OMDBUpdateEvent volEvent = events.get(3);
+    assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, volEvent.getAction());
+    assertEquals(volumeKey, volEvent.getKey());
+  }
+
+  @Test
+  public void testGetValueType() throws IOException {
+    OzoneConfiguration configuration = createNewTestPath();
+    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration);
+    OMDBUpdatesHandler omdbUpdatesHandler =
+        new OMDBUpdatesHandler(metaMgr);
+
+    assertEquals(OmKeyInfo.class, omdbUpdatesHandler.getValueType(
+        metaMgr.getKeyTable().getName()));
+    assertEquals(OmVolumeArgs.class, omdbUpdatesHandler.getValueType(
+        metaMgr.getVolumeTable().getName()));
+    assertEquals(OmBucketInfo.class, omdbUpdatesHandler.getValueType(
+        metaMgr.getBucketTable().getName()));
+  }
+}

+ 171 - 0
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java

@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.persistence.AbstractSqlDatabaseTest;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
+import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.jooq.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Class used to test ReconTaskControllerImpl.
+ */
+public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
+
+  private ReconTaskController reconTaskController;
+
+  private Configuration sqlConfiguration;
+  @Before
+  public void setUp() throws Exception {
+
+    File omDbDir = temporaryFolder.newFolder();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath());
+    ReconOMMetadataManager omMetadataManager = new ReconOmMetadataManagerImpl(
+        ozoneConfiguration);
+
+    sqlConfiguration = getInjector()
+        .getInstance(Configuration.class);
+
+    ReconInternalSchemaDefinition schemaDefinition = getInjector().
+        getInstance(ReconInternalSchemaDefinition.class);
+    schemaDefinition.initializeSchema();
+
+    reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration,
+        omMetadataManager, sqlConfiguration);
+  }
+
+  @Test
+  public void testRegisterTask() throws Exception {
+    String taskName = "Dummy_" + System.currentTimeMillis();
+    DummyReconDBTask dummyReconDBTask =
+        new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_PASS);
+    reconTaskController.registerTask(dummyReconDBTask);
+    assertTrue(reconTaskController.getRegisteredTasks().size() == 1);
+    assertTrue(reconTaskController.getRegisteredTasks()
+        .get(dummyReconDBTask.getTaskName()) == dummyReconDBTask);
+  }
+
+  @Test
+  public void testConsumeOMEvents() throws Exception {
+
+    ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class);
+    when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections
+        .EMPTY_LIST);
+    when(reconDBUpdateTaskMock.getTaskName()).thenReturn("MockTask");
+    when(reconDBUpdateTaskMock.process(any(OMUpdateEventBatch.class)))
+        .thenReturn(new ImmutablePair<>("MockTask", true));
+    reconTaskController.registerTask(reconDBUpdateTaskMock);
+    reconTaskController.consumeOMEvents(
+        new OMUpdateEventBatch(Collections.emptyList()));
+
+    verify(reconDBUpdateTaskMock, times(1))
+        .process(any());
+  }
+
+  @Test
+  public void testFailedTaskRetryLogic() throws Exception {
+    String taskName = "Dummy_" + System.currentTimeMillis();
+    DummyReconDBTask dummyReconDBTask =
+        new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE);
+    reconTaskController.registerTask(dummyReconDBTask);
+
+
+    long currentTime = System.nanoTime();
+    OMDBUpdateEvent.EventInfo eventInfoMock = mock(
+        OMDBUpdateEvent.EventInfo.class);
+    when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
+    when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
+
+    OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
+    when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
+
+    reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
+    assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
+    assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
+        .get(dummyReconDBTask.getTaskName()));
+
+    ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration);
+    ReconTaskStatus dbRecord = dao.findById(taskName);
+
+    Assert.assertEquals(taskName, dbRecord.getTaskName());
+    Assert.assertEquals(Long.valueOf(currentTime),
+        dbRecord.getLastUpdatedTimestamp());
+    Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber());
+  }
+
+  @Test
+  public void testBadBehavedTaskBlacklisting() throws Exception {
+    String taskName = "Dummy_" + System.currentTimeMillis();
+    DummyReconDBTask dummyReconDBTask =
+        new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL);
+    reconTaskController.registerTask(dummyReconDBTask);
+
+
+    long currentTime = System.nanoTime();
+    OMDBUpdateEvent.EventInfo eventInfoMock =
+        mock(OMDBUpdateEvent.EventInfo.class);
+    when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
+    when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
+
+    OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
+    when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
+
+    for (int i = 0; i < 2; i++) {
+      reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
+
+      assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
+      assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
+          .get(dummyReconDBTask.getTaskName()));
+    }
+
+    //Should be blacklisted now.
+    reconTaskController.consumeOMEvents(
+        new OMUpdateEventBatch(Collections.emptyList()));
+    assertTrue(reconTaskController.getRegisteredTasks().isEmpty());
+
+    ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration);
+    ReconTaskStatus dbRecord = dao.findById(taskName);
+
+    Assert.assertEquals(taskName, dbRecord.getTaskName());
+    Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp());
+    Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber());
+  }
+}