Browse Source

HDFS-12149. Ozone: RocksDB implementation of ozone metadata store. Contributed by Weiwei Yang.

Anu Engineer 7 years ago
parent
commit
bbbfc885e7

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/pom.xml

@@ -192,6 +192,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <groupId>org.fusesource.leveldbjni</groupId>
       <groupId>org.fusesource.leveldbjni</groupId>
       <artifactId>leveldbjni-all</artifactId>
       <artifactId>leveldbjni-all</artifactId>
     </dependency>
     </dependency>
+    <dependency>
+      <groupId>org.rocksdb</groupId>
+      <artifactId>rocksdbjni</artifactId>
+      <version>5.5.5</version>
+    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
     <dependency>
       <groupId>org.bouncycastle</groupId>
       <groupId>org.bouncycastle</groupId>

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java

@@ -126,7 +126,9 @@ public class LevelDBStore implements MetadataStore {
    */
    */
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
-    db.close();
+    if (db != null){
+      db.close();
+    }
   }
   }
 
 
   /**
   /**
@@ -163,6 +165,7 @@ public class LevelDBStore implements MetadataStore {
 
 
   @Override
   @Override
   public void destroy() throws IOException {
   public void destroy() throws IOException {
+    close();
     JniDBFactory.factory.destroy(dbFile, dbOptions);
     JniDBFactory.factory.destroy(dbFile, dbOptions);
   }
   }
 
 

+ 10 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.utils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.Options;
+import org.rocksdb.BlockBasedTableConfig;
 
 
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
@@ -82,8 +83,15 @@ public class MetadataStoreBuilder {
       }
       }
       store = new LevelDBStore(dbFile, options);
       store = new LevelDBStore(dbFile, options);
     } else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) {
     } else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) {
-      // TODO replace with rocksDB impl
-      store = new LevelDBStore(dbFile, new Options());
+      org.rocksdb.Options opts = new org.rocksdb.Options();
+      opts.setCreateIfMissing(createIfMissing);
+
+      if (cacheSize > 0) {
+        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+        tableConfig.setBlockCacheSize(cacheSize);
+        opts.setTableFormatConfig(tableConfig);
+      }
+      store = new RocksDBStore(dbFile, opts);
     } else {
     } else {
       throw new IllegalArgumentException("Invalid argument for "
       throw new IllegalArgumentException("Invalid argument for "
           + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL
           + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL

+ 318 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java

@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.DbPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.AbstractMap;
+
+/**
+ * RocksDB implementation of ozone metadata store.
+ */
+public class RocksDBStore implements MetadataStore {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RocksDBStore.class);
+
+  private RocksDB db = null;
+  private File dbLocation;
+  private WriteOptions writeOptions;
+  private Options dbOptions;
+
+  public RocksDBStore(File dbFile, Options options) throws IOException {
+    Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
+    RocksDB.loadLibrary();
+    dbOptions = options;
+    dbLocation = dbFile;
+    writeOptions = new WriteOptions();
+    writeOptions.setSync(true);
+    writeOptions.setNoSlowdown(true);
+    try {
+      db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath());
+    } catch (RocksDBException e) {
+      throw new IOException("Failed init RocksDB, db path : "
+          + dbFile.getAbsolutePath(), e);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RocksDB successfully opened.");
+      LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
+      LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
+      LOG.debug("[Option] compactionPriority= {}", options.compactionStyle());
+      LOG.debug("[Option] compressionType= {}", options.compressionType());
+      LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
+      LOG.debug("[Option] writeBufferSize= {}", options.writeBufferSize());
+    }
+  }
+
+  private IOException toIOException(String msg, RocksDBException e) {
+    String statusCode = e.getStatus() == null ? "N/A" :
+        e.getStatus().getCodeString();
+    String errMessage = e.getMessage() == null ? "Unknown error" :
+        e.getMessage();
+    String output = msg + "; status : " + statusCode
+        + "; message : " + errMessage;
+    return new IOException(output, e);
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) throws IOException {
+    try {
+      db.put(writeOptions, key, value);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to put key-value to metadata store", e);
+    }
+  }
+
+  @Override
+  public boolean isEmpty() throws IOException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      it.seekToFirst();
+      return !it.isValid();
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public byte[] get(byte[] key) throws IOException {
+    try {
+      return db.get(key);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to get the value for the given key", e);
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) throws IOException {
+    try {
+      db.delete(key);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to delete the given key", e);
+    }
+  }
+
+  @Override
+  public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    List<Map.Entry<byte[], byte[]>> result = new ArrayList<>();
+    long start = System.currentTimeMillis();
+    if (count < 0) {
+      throw new IllegalArgumentException(
+          "Invalid count given " + count + ", count must be greater than 0");
+    }
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (startKey == null) {
+        it.seekToFirst();
+      } else {
+        if(get(startKey) == null) {
+          throw new IOException("Invalid start key, not found in current db");
+        }
+        it.seek(startKey);
+      }
+      while(it.isValid() && result.size() < count) {
+        byte[] currentKey = it.key();
+        byte[] currentValue = it.value();
+
+        it.prev();
+        final byte[] prevKey = it.isValid() ? it.key() : null;
+
+        it.seek(currentKey);
+        it.next();
+        final byte[] nextKey = it.isValid() ? it.key() : null;
+
+        if (filters == null || Arrays.asList(filters).stream()
+            .allMatch(entry -> entry.filterKey(prevKey,
+                currentKey, nextKey))) {
+          result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
+              currentValue));
+        }
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+      long end = System.currentTimeMillis();
+      long timeConsumed = end - start;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Time consumed for getRangeKVs() is {}ms,"
+            + " result length is {}.", timeConsumed, result.size());
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public void writeBatch(BatchOperation operation)
+      throws IOException {
+    List<BatchOperation.SingleOperation> operations =
+        operation.getOperations();
+    if (!operations.isEmpty()) {
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        for (BatchOperation.SingleOperation opt : operations) {
+          switch (opt.getOpt()) {
+          case DELETE:
+            writeBatch.remove(opt.getKey());
+            break;
+          case PUT:
+            writeBatch.put(opt.getKey(), opt.getValue());
+            break;
+          default:
+            throw new IllegalArgumentException("Invalid operation "
+                + opt.getOpt());
+          }
+        }
+        db.write(writeOptions, writeBatch);
+      } catch (RocksDBException e) {
+        throw toIOException("Batch write operation failed", e);
+      }
+    }
+  }
+
+  @Override
+  public void compactDB() throws IOException {
+    if (db != null) {
+      try {
+        db.compactRange();
+      } catch (RocksDBException e) {
+        throw toIOException("Failed to compact db", e);
+      }
+    }
+  }
+
+  private void deleteQuietly(File fileOrDir) {
+    if (fileOrDir != null && fileOrDir.exists()) {
+      try {
+        FileUtils.forceDelete(fileOrDir);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete dir {}", fileOrDir.getAbsolutePath(), e);
+      }
+    }
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    // Make sure db is closed.
+    close();
+
+    // There is no destroydb java API available,
+    // equivalently we can delete all db directories.
+    deleteQuietly(dbLocation);
+    deleteQuietly(new File(dbOptions.dbLogDir()));
+    deleteQuietly(new File(dbOptions.walDir()));
+    List<DbPath> dbPaths = dbOptions.dbPaths();
+    if (dbPaths != null) {
+      dbPaths.forEach(dbPath -> {
+        deleteQuietly(new File(dbPath.toString()));
+      });
+    }
+  }
+
+  @Override
+  public ImmutablePair<byte[], byte[]> peekAround(int offset,
+      byte[] from) throws IOException, IllegalArgumentException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (from == null) {
+        it.seekToFirst();
+      } else {
+        it.seek(from);
+      }
+      if (!it.isValid()) {
+        throw new IOException("Key not found");
+      }
+
+      switch (offset) {
+      case 0:
+        break;
+      case 1:
+        it.next();
+        break;
+      case -1:
+        it.prev();
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Position can only be -1, 0 " + "or 1, but found " + offset);
+      }
+      return it.isValid() ? new ImmutablePair<>(it.key(), it.value()) : null;
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public void iterate(byte[] from, EntryConsumer consumer)
+      throws IOException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (from != null) {
+        it.seek(from);
+      } else {
+        it.seekToFirst();
+      }
+      while (it.isValid()) {
+        if (!consumer.consume(it.key(), it.value())) {
+          break;
+        }
+        it.next();
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (db != null) {
+      db.close();
+    }
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml

@@ -598,7 +598,7 @@
 
 
   <property>
   <property>
     <name>ozone.metastore.impl</name>
     <name>ozone.metastore.impl</name>
-    <value>LevelDB</value>
+    <value>RocksDB</value>
     <description>
     <description>
       Ozone metadata store implementation. Ozone metadata are well distributed
       Ozone metadata store implementation. Ozone metadata are well distributed
       to multiple services such as ksm, scm. They are stored in some local
       to multiple services such as ksm, scm. They are stored in some local

+ 95 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java

@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.utils.BatchOperation;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
@@ -33,6 +34,8 @@ import org.junit.After;
 import org.junit.Test;
 import org.junit.Test;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
@@ -40,15 +43,35 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.UUID;
 import java.util.UUID;
+import java.util.Collection;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.runners.Parameterized.*;
 
 
 /**
 /**
  * Test class for ozone metadata store.
  * Test class for ozone metadata store.
  */
  */
+@RunWith(Parameterized.class)
 public class TestMetadataStore {
 public class TestMetadataStore {
 
 
+  private final String storeImpl;
+
+  public TestMetadataStore(String metadataImpl) {
+    this.storeImpl = metadataImpl;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB},
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB}
+    });
+  }
+
   private MetadataStore store;
   private MetadataStore store;
   private File testDir;
   private File testDir;
-
   private final static int MAX_GETRANGE_LENGTH = 100;
   private final static int MAX_GETRANGE_LENGTH = 100;
 
 
   @Rule
   @Rule
@@ -56,11 +79,11 @@ public class TestMetadataStore {
 
 
   @Before
   @Before
   public void init() throws IOException {
   public void init() throws IOException {
-    testDir = GenericTestUtils.getTestDir(getClass().getSimpleName());
+    testDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase());
 
 
     Configuration conf = new OzoneConfiguration();
     Configuration conf = new OzoneConfiguration();
-    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
-        OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
 
 
     store = MetadataStoreBuilder.newBuilder()
     store = MetadataStoreBuilder.newBuilder()
         .setConf(conf)
         .setConf(conf)
@@ -293,4 +316,72 @@ public class TestMetadataStore {
     expectedException.expectMessage("Invalid start key");
     expectedException.expectMessage("Invalid start key");
     store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
     store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
   }
   }
+
+  @Test
+  public void testDestroyDB() throws IOException {
+    // create a new DB to test db destroy
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+    File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase() + "-toDestroy");
+    MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(dbDir)
+        .build();
+
+    dbStore.put(getBytes("key1"), getBytes("value1"));
+    dbStore.put(getBytes("key2"), getBytes("value2"));
+
+    Assert.assertFalse(dbStore.isEmpty());
+    Assert.assertTrue(dbDir.exists());
+    Assert.assertTrue(dbDir.listFiles().length > 0);
+
+    dbStore.destroy();
+
+    Assert.assertFalse(dbDir.exists());
+  }
+
+  @Test
+  public void testBatchWrite() throws IOException {
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+    File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase() + "-batchWrite");
+    MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(dbDir)
+        .build();
+
+    List<String> expectedResult = Lists.newArrayList();
+    for (int i = 0; i<10; i++) {
+      dbStore.put(getBytes("batch-" + i), getBytes("batch-value-" + i));
+      expectedResult.add("batch-" + i);
+    }
+
+    BatchOperation batch = new BatchOperation();
+    batch.delete(getBytes("batch-2"));
+    batch.delete(getBytes("batch-3"));
+    batch.delete(getBytes("batch-4"));
+    batch.put(getBytes("batch-new-2"), getBytes("batch-new-value-2"));
+
+    expectedResult.remove("batch-2");
+    expectedResult.remove("batch-3");
+    expectedResult.remove("batch-4");
+    expectedResult.add("batch-new-2");
+
+    dbStore.writeBatch(batch);
+
+    Iterator<String> it = expectedResult.iterator();
+    AtomicInteger count = new AtomicInteger(0);
+    dbStore.iterate(null, (key, value) -> {
+      count.incrementAndGet();
+      return it.hasNext() && it.next().equals(getString(key));
+    });
+
+    Assert.assertEquals(8, count.get());
+  }
 }
 }