Browse Source

HDDS-1802. Add Eviction policy for table cache. (#1100)

Bharat Viswanadham 5 years ago
parent
commit
73e6ffce69

+ 13 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java

@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
 
 /**
  * The DBStore interface provides the ability to create Tables, which store
@@ -47,7 +48,9 @@ public interface DBStore extends AutoCloseable {
 
 
   /**
-   * Gets an existing TableStore with implicit key/value conversion.
+   * Gets an existing TableStore with implicit key/value conversion and
+   * with default cleanup policy for cache. Default cache clean up policy is
+   * manual.
    *
    * @param name - Name of the TableStore to get
    * @param keyType
@@ -58,6 +61,15 @@ public interface DBStore extends AutoCloseable {
   <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
       Class<KEY> keyType, Class<VALUE> valueType) throws IOException;
 
+  /**
+   * Gets an existing TableStore with implicit key/value conversion and
+   * with specified cleanup policy for cache.
+   * @throws IOException
+   */
+  <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
+      Class<KEY> keyType, Class<VALUE> valueType,
+      TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException;
+
   /**
    * Lists the Known list of Tables in a DB.
    *

+ 9 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.utils.RocksDBStoreMBean;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
 import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -260,6 +261,14 @@ public class RDBStore implements DBStore {
         valueType);
   }
 
+  @Override
+  public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
+      Class<KEY> keyType, Class<VALUE> valueType,
+      TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException {
+    return new TypedTable<KEY, VALUE>(getTable(name), codecRegistry, keyType,
+        valueType, cleanupPolicy);
+  }
+
   @Override
   public ArrayList<Table> listTables() throws IOException {
     ArrayList<Table> returnList = new ArrayList<>();

+ 73 - 13
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java

@@ -23,11 +23,16 @@ import java.util.Iterator;
 import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheResult;
 import org.apache.hadoop.utils.db.cache.CacheValue;
-import org.apache.hadoop.utils.db.cache.PartialTableCache;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
 import org.apache.hadoop.utils.db.cache.TableCache;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl.CacheCleanupPolicy;
 
+import static org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus.EXISTS;
+import static org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus.NOT_EXIST;
 /**
  * Strongly typed table implementation.
  * <p>
@@ -49,16 +54,61 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
   private final TableCache<CacheKey<KEY>, CacheValue<VALUE>> cache;
 
+  private final static long EPOCH_DEFAULT = -1L;
 
+  /**
+   * Create an TypedTable from the raw table.
+   * Default cleanup policy used for the table is
+   * {@link CacheCleanupPolicy#MANUAL}.
+   * @param rawTable
+   * @param codecRegistry
+   * @param keyType
+   * @param valueType
+   */
+  public TypedTable(
+      Table<byte[], byte[]> rawTable,
+      CodecRegistry codecRegistry, Class<KEY> keyType,
+      Class<VALUE> valueType) throws IOException {
+    this(rawTable, codecRegistry, keyType, valueType,
+        CacheCleanupPolicy.MANUAL);
+  }
+
+  /**
+   * Create an TypedTable from the raw table with specified cleanup policy
+   * for table cache.
+   * @param rawTable
+   * @param codecRegistry
+   * @param keyType
+   * @param valueType
+   * @param cleanupPolicy
+   */
   public TypedTable(
       Table<byte[], byte[]> rawTable,
       CodecRegistry codecRegistry, Class<KEY> keyType,
-      Class<VALUE> valueType) {
+      Class<VALUE> valueType,
+      TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException {
     this.rawTable = rawTable;
     this.codecRegistry = codecRegistry;
     this.keyType = keyType;
     this.valueType = valueType;
-    cache = new PartialTableCache<>();
+    cache = new TableCacheImpl<>(cleanupPolicy);
+
+    if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
+      //fill cache
+      try(TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> tableIterator =
+              iterator()) {
+
+        while (tableIterator.hasNext()) {
+          KeyValue< KEY, VALUE > kv = tableIterator.next();
+
+          // We should build cache after OM restart when clean up policy is
+          // NEVER. Setting epoch value -1, so that when it is marked for
+          // delete, this will be considered for cleanup.
+          cache.put(new CacheKey<>(kv.getKey()),
+              new CacheValue<>(Optional.of(kv.getValue()), EPOCH_DEFAULT));
+        }
+      }
+    }
   }
 
   @Override
@@ -83,9 +133,17 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
   @Override
   public boolean isExist(KEY key) throws IOException {
-    CacheValue<VALUE> cacheValue= cache.get(new CacheKey<>(key));
-    return (cacheValue != null && cacheValue.getCacheValue() != null) ||
-        rawTable.isExist(codecRegistry.asRawData(key));
+
+    CacheResult<CacheValue<VALUE>> cacheResult =
+        cache.lookup(new CacheKey<>(key));
+
+    if (cacheResult.getCacheStatus() == EXISTS) {
+      return true;
+    } else if (cacheResult.getCacheStatus() == NOT_EXIST) {
+      return false;
+    } else {
+      return rawTable.isExist(codecRegistry.asRawData(key));
+    }
   }
 
   /**
@@ -104,14 +162,16 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
   public VALUE get(KEY key) throws IOException {
     // Here the metadata lock will guarantee that cache is not updated for same
     // key during get key.
-    CacheValue< VALUE > cacheValue = cache.get(new CacheKey<>(key));
-    if (cacheValue == null) {
-      // If no cache for the table or if it does not exist in cache get from
-      // RocksDB table.
-      return getFromTable(key);
+
+    CacheResult<CacheValue<VALUE>> cacheResult =
+        cache.lookup(new CacheKey<>(key));
+
+    if (cacheResult.getCacheStatus() == EXISTS) {
+      return cacheResult.getValue().getCacheValue();
+    } else if (cacheResult.getCacheStatus() == NOT_EXIST) {
+      return null;
     } else {
-      // We have a value in cache, return the value.
-      return cacheValue.getCacheValue();
+      return getFromTable(key);
     }
   }
 

+ 58 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheResult.java

@@ -0,0 +1,58 @@
+package org.apache.hadoop.utils.db.cache;
+
+import java.util.Objects;
+
+/**
+ * CacheResult which is returned as response for Key exist in cache or not.
+ * @param <CACHEVALUE>
+ */
+public class CacheResult<CACHEVALUE extends CacheValue> {
+
+  private CacheStatus cacheStatus;
+  private CACHEVALUE cachevalue;
+
+  public CacheResult(CacheStatus cacheStatus, CACHEVALUE cachevalue) {
+    this.cacheStatus = cacheStatus;
+    this.cachevalue = cachevalue;
+  }
+
+  public CacheStatus getCacheStatus() {
+    return cacheStatus;
+  }
+
+  public CACHEVALUE getValue() {
+    return cachevalue;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CacheResult< ? > that = (CacheResult< ? >) o;
+    return cacheStatus == that.cacheStatus &&
+        Objects.equals(cachevalue, that.cachevalue);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(cacheStatus, cachevalue);
+  }
+
+  /**
+   * Status which tells whether key exists in cache or not.
+   */
+  public enum CacheStatus {
+    EXISTS, // When key exists in cache.
+
+    NOT_EXIST, // We guarantee that it does not exist. This will be returned
+    // when the key does not exist in cache, when cache clean up policy is
+    // NEVER.
+    MAY_EXIST  // This will be returned when the key does not exist in
+    // cache, when cache clean up policy is MANUAL. So caller need to check
+    // if it might exist in it's rocksdb table.
+  }
+}

+ 28 - 3
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java

@@ -21,7 +21,8 @@ package org.apache.hadoop.utils.db.cache;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
-
+import org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl.CacheCleanupPolicy;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -52,8 +53,11 @@ public interface TableCache<CACHEKEY extends CacheKey,
 
   /**
    * Removes all the entries from the cache which are having epoch value less
-   * than or equal to specified epoch value. For FullTable Cache this is a
-   * do-nothing operation.
+   * than or equal to specified epoch value.
+   *
+   * If clean up policy is NEVER, this is a do nothing operation.
+   * If clean up policy is MANUAL, it is caller responsibility to cleanup the
+   * cache before calling cleanup.
    * @param epoch
    */
   void cleanup(long epoch);
@@ -69,4 +73,25 @@ public interface TableCache<CACHEKEY extends CacheKey,
    * @return iterator of the underlying cache for the table.
    */
   Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator();
+
+  /**
+   * Check key exist in cache or not.
+   *
+   * If it exists return CacheResult with value and status as
+   * {@link CacheStatus#EXISTS}
+   *
+   * If it does not exist:
+   *  If cache clean up policy is
+   *  {@link TableCacheImpl.CacheCleanupPolicy#NEVER} it means table cache is
+   *  full cache. It return's {@link CacheResult} with null
+   *  and status as {@link CacheStatus#NOT_EXIST}.
+   *
+   *  If cache clean up policy is {@link CacheCleanupPolicy#MANUAL} it means
+   *  table cache is partial cache. It return's {@link CacheResult} with
+   *  null and status as MAY_EXIST.
+   *
+   * @param cachekey
+   */
+  CacheResult<CACHEVALUE> lookup(CACHEKEY cachekey);
+
 }

+ 69 - 10
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java → hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCacheImpl.java

@@ -32,21 +32,28 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
 /**
- * Cache implementation for the table, this cache is partial cache, this will
- * be cleaned up, after entries are flushed to DB.
+ * Cache implementation for the table. Depending on the cache clean up policy
+ * this cache will be full cache or partial cache.
+ *
+ * If cache cleanup policy is set as {@link CacheCleanupPolicy#MANUAL},
+ * this will be a partial cache.
+ *
+ * If cache cleanup policy is set as {@link CacheCleanupPolicy#NEVER},
+ * this will be a full cache.
  */
 @Private
 @Evolving
-public class PartialTableCache<CACHEKEY extends CacheKey,
+public class TableCacheImpl<CACHEKEY extends CacheKey,
     CACHEVALUE extends CacheValue> implements TableCache<CACHEKEY, CACHEVALUE> {
 
   private final ConcurrentHashMap<CACHEKEY, CACHEVALUE> cache;
   private final TreeSet<EpochEntry<CACHEKEY>> epochEntries;
   private ExecutorService executorService;
+  private CacheCleanupPolicy cleanupPolicy;
 
 
 
-  public PartialTableCache() {
+  public TableCacheImpl(CacheCleanupPolicy cleanupPolicy) {
     cache = new ConcurrentHashMap<>();
     epochEntries = new TreeSet<>();
     // Created a singleThreadExecutor, so one cleanup will be running at a
@@ -54,7 +61,7 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
     ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
         .setNameFormat("PartialTableCache Cleanup Thread - %d").build();
     executorService = Executors.newSingleThreadExecutor(build);
-
+    this.cleanupPolicy = cleanupPolicy;
   }
 
   @Override
@@ -70,7 +77,7 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
 
   @Override
   public void cleanup(long epoch) {
-    executorService.submit(() -> evictCache(epoch));
+    executorService.submit(() -> evictCache(epoch, cleanupPolicy));
   }
 
   @Override
@@ -83,16 +90,24 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
     return cache.entrySet().iterator();
   }
 
-  private void evictCache(long epoch) {
+  private void evictCache(long epoch, CacheCleanupPolicy cacheCleanupPolicy) {
     EpochEntry<CACHEKEY> currentEntry = null;
     for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
          iterator.hasNext();) {
       currentEntry = iterator.next();
       CACHEKEY cachekey = currentEntry.getCachekey();
       CacheValue cacheValue = cache.computeIfPresent(cachekey, ((k, v) -> {
-        if (v.getEpoch() <= epoch) {
-          iterator.remove();
-          return null;
+        if (cleanupPolicy == CacheCleanupPolicy.MANUAL) {
+          if (v.getEpoch() <= epoch) {
+            iterator.remove();
+            return null;
+          }
+        } else if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
+          // Remove only entries which are marked for delete.
+          if (v.getEpoch() <= epoch && v.getCacheValue() == null) {
+            iterator.remove();
+            return null;
+          }
         }
         return v;
       }));
@@ -103,4 +118,48 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
       }
     }
   }
+
+  public CacheResult<CACHEVALUE> lookup(CACHEKEY cachekey) {
+
+    // TODO: Remove this check once HA and Non-HA code is merged and all
+    //  requests are converted to use cache and double buffer.
+    // This is to done as temporary instead of passing ratis enabled flag
+    // which requires more code changes. We cannot use ratis enabled flag
+    // also because some of the requests in OM HA are not modified to use
+    // double buffer and cache.
+
+    if (cache.size() == 0) {
+      return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST,
+          null);
+    }
+
+    CACHEVALUE cachevalue = cache.get(cachekey);
+    if (cachevalue == null) {
+      if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
+        return new CacheResult<>(CacheResult.CacheStatus.NOT_EXIST, null);
+      } else {
+        return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST,
+            null);
+      }
+    } else {
+      if (cachevalue.getCacheValue() != null) {
+        return new CacheResult<>(CacheResult.CacheStatus.EXISTS, cachevalue);
+      } else {
+        // When entity is marked for delete, cacheValue will be set to null.
+        // In that case we can return NOT_EXIST irrespective of cache cleanup
+        // policy.
+        return new CacheResult<>(CacheResult.CacheStatus.NOT_EXIST, null);
+      }
+    }
+  }
+
+  /**
+   * Cleanup policies for table cache.
+   */
+  public enum CacheCleanupPolicy {
+    NEVER, // Cache will not be cleaned up. This mean's the table maintains
+    // full cache.
+    MANUAL // Cache will be cleaned up, once after flushing to DB. It is
+    // caller's responsibility to flush to DB, before calling cleanup cache.
+  }
 }

+ 51 - 20
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java → hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestTableCacheImpl.java

@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.utils.db.cache;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import com.google.common.base.Optional;
@@ -26,18 +28,40 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.fail;
 
 /**
  * Class tests partial table cache.
  */
-public class TestPartialTableCache {
+@RunWith(value = Parameterized.class)
+public class TestTableCacheImpl {
   private TableCache<CacheKey<String>, CacheValue<String>> tableCache;
 
+  private final TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy;
+
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> policy() {
+    Object[][] params = new Object[][] {
+        {TableCacheImpl.CacheCleanupPolicy.NEVER},
+        {TableCacheImpl.CacheCleanupPolicy.MANUAL}
+    };
+    return Arrays.asList(params);
+  }
+
+  public TestTableCacheImpl(
+      TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy) {
+    this.cacheCleanupPolicy = cacheCleanupPolicy;
+  }
+
+
   @Before
   public void create() {
-    tableCache = new PartialTableCache<>();
+    tableCache =
+        new TableCacheImpl<>(cacheCleanupPolicy);
   }
   @Test
   public void testPartialTableCache() {
@@ -98,33 +122,40 @@ public class TestPartialTableCache {
           tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
     }
 
-    int deleted = 5;
-    // cleanup first 5 entires
-    tableCache.cleanup(deleted);
 
     value = future.get();
     Assert.assertEquals(10, value);
 
     totalCount += value;
 
-    // We should totalCount - deleted entries in cache.
-    final int tc = totalCount;
-    GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100,
-        5000);
-
-    // Check if we have remaining entries.
-    for (int i=6; i <= totalCount; i++) {
-      Assert.assertEquals(Integer.toString(i),
-          tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
+    if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.MANUAL) {
+      int deleted = 5;
+
+      // cleanup first 5 entires
+      tableCache.cleanup(deleted);
+
+      // We should totalCount - deleted entries in cache.
+      final int tc = totalCount;
+      GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100,
+          5000);
+      // Check if we have remaining entries.
+      for (int i=6; i <= totalCount; i++) {
+        Assert.assertEquals(Integer.toString(i), tableCache.get(
+            new CacheKey<>(Integer.toString(i))).getCacheValue());
+      }
+      tableCache.cleanup(10);
+
+      tableCache.cleanup(totalCount);
+
+      // Cleaned up all entries, so cache size should be zero.
+      GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100,
+          5000);
+    } else {
+      tableCache.cleanup(totalCount);
+      Assert.assertEquals(totalCount, tableCache.size());
     }
 
-    tableCache.cleanup(10);
-
-    tableCache.cleanup(totalCount);
 
-    // Cleaned up all entries, so cache size should be zero.
-    GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100,
-        5000);
   }
 
   private int writeToCache(int count, int startVal, long sleep)

+ 5 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java

@@ -69,6 +69,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import org.apache.hadoop.utils.db.TypedTable;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
+import org.apache.hadoop.utils.db.cache.TableCacheImpl;
 import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -269,11 +270,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
         this.store.getTable(USER_TABLE, String.class, VolumeList.class);
     checkTableStatus(userTable, USER_TABLE);
     volumeTable =
-        this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
+        this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class,
+            TableCacheImpl.CacheCleanupPolicy.NEVER);
     checkTableStatus(volumeTable, VOLUME_TABLE);
 
     bucketTable =
-        this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
+        this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class,
+            TableCacheImpl.CacheCleanupPolicy.NEVER);
 
     checkTableStatus(bucketTable, BUCKET_TABLE);