Browse Source

WIP Various fixes and performance improvements.

Haohui Mai 10 năm trước cách đây
mục cha
commit
13f7b64731
17 tập tin đã thay đổi với 355 bổ sung28 xóa
  1. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  2. 28 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
  3. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
  4. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java
  5. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java
  6. 83 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java
  7. 22 1
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java
  8. 6 0
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java
  9. 6 0
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java
  10. 34 0
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java
  11. 46 0
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc
  12. 4 0
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h
  13. 8 0
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc
  14. 7 1
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc
  15. 9 4
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h
  16. 11 0
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h
  17. 63 3
      hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -348,7 +348,17 @@ public class FSDirectory implements Closeable {
     this.enableLevelDb = conf.getBoolean("dfs.partialns", false);
     if (enableLevelDb) {
       String dbPath = conf.get("dfs.partialns.path");
-      Options options = new Options().createIfMissing(true);
+      int writeBufferSize = conf.getInt("dfs.partialns.writebuffer",
+                                        4096 * 1024);
+      long blockCacheSize = conf.getLong(
+          "dfs.partialns.blockcache", 0);
+      Options options = new Options().createIfMissing(true)
+          .writeBufferSize(writeBufferSize);
+
+      if (blockCacheSize != 0) {
+        options.blockCacheSize(blockCacheSize);
+      }
+
       this.levelDb = org.apache.hadoop.hdfs.hdfsdb.DB.open(options, dbPath);
       try (RWTransaction tx = newRWTransaction().begin()) {
         tx.putINode(ROOT_INODE_ID, createRootForFlatNS(ns));

+ 28 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java

@@ -29,37 +29,43 @@ import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
 
 class LevelDBROTransaction extends ROTransaction {
   private final org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb;
-  private static final ReadOptions OPTIONS = new ReadOptions();
+
+  private Snapshot snapshot;
+  private final ReadOptions options = new ReadOptions();
+  public static final ReadOptions OPTIONS = new ReadOptions();
+
   LevelDBROTransaction(FSDirectory fsd, org.apache.hadoop.hdfs.hdfsdb.DB db) {
     super(fsd);
     this.hdfsdb = db;
   }
 
   LevelDBROTransaction begin() {
-    fsd.readLock();
+    snapshot = hdfsdb.snapshot();
+    options.snapshot(snapshot);
     return this;
   }
 
   @Override
   FlatINode getINode(long id) {
-    return getFlatINode(id, hdfsdb);
+    return getFlatINode(id, hdfsdb, options);
   }
 
   @Override
   long getChild(long parentId, ByteBuffer localName) {
-    return getChild(parentId, localName, hdfsdb);
+    return getChild(parentId, localName, hdfsdb, options);
   }
 
   @Override
   DBChildrenView childrenView(long parent) {
-    return getChildrenView(parent, hdfsdb);
+    return getChildrenView(parent, hdfsdb, options);
   }
 
   static FlatINode getFlatINode(
-      long id, org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb) {
+      long id, DB hdfsdb, ReadOptions options) {
     byte[] key = inodeKey(id);
     try {
-      byte[] bytes = hdfsdb.get(OPTIONS, key);
+      byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb
+          .snapshotGet(options, key);
       if (bytes == null) {
         return null;
       }
@@ -83,11 +89,13 @@ class LevelDBROTransaction extends ROTransaction {
       };
   }
 
-  static long getChild(long parentId, ByteBuffer localName, DB hdfsdb) {
+  static long getChild(
+      long parentId, ByteBuffer localName, DB hdfsdb, ReadOptions options) {
     Preconditions.checkArgument(localName.hasRemaining());
     byte[] key = inodeChildKey(parentId, localName);
     try {
-      byte[] bytes = hdfsdb.get(OPTIONS, key);
+      byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb
+          .snapshotGet(options, key);
       if (bytes == null) {
         return INVALID_INODE_ID;
       }
@@ -109,7 +117,8 @@ class LevelDBROTransaction extends ROTransaction {
     return key;
   }
 
-  static DBChildrenView getChildrenView(long parent, DB hdfsdb) {
+  static DBChildrenView getChildrenView(
+      long parent, DB hdfsdb, ReadOptions options) {
     byte[] key = new byte[]{'I',
         (byte) ((parent >> 56) & 0xff),
         (byte) ((parent >> 48) & 0xff),
@@ -121,9 +130,17 @@ class LevelDBROTransaction extends ROTransaction {
         (byte) (parent & 0xff),
         1
     };
-    Iterator it = hdfsdb.iterator(OPTIONS);
+    Iterator it = hdfsdb.iterator(options);
     it.seek(key);
     return new LevelDBChildrenView(parent, it);
   }
 
+  @Override
+  public void close() throws IOException {
+    try {
+      snapshot.close();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 }

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java

@@ -37,17 +37,20 @@ class LevelDBRWTransaction extends RWTransaction {
 
   @Override
   FlatINode getINode(long id) {
-    return LevelDBROTransaction.getFlatINode(id, hdfsdb);
+    return LevelDBROTransaction.getFlatINode(id, hdfsdb,
+                                             LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   long getChild(long parentId, ByteBuffer localName) {
-    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb);
+    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb,
+                                         LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   DBChildrenView childrenView(long parent) {
-    return LevelDBROTransaction.getChildrenView(parent, hdfsdb);
+    return LevelDBROTransaction.getChildrenView(parent, hdfsdb,
+                                                LevelDBROTransaction.OPTIONS);
   }
 
   @Override

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java

@@ -38,17 +38,20 @@ public class LevelDBReplayTransaction extends ReplayTransaction {
 
   @Override
   FlatINode getINode(long id) {
-    return LevelDBROTransaction.getFlatINode(id, hdfsdb);
+    return LevelDBROTransaction.getFlatINode(id, hdfsdb,
+                                             LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   long getChild(long parentId, ByteBuffer localName) {
-    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb);
+    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb,
+                                         LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   DBChildrenView childrenView(long parent) {
-    return LevelDBROTransaction.getChildrenView(parent, hdfsdb);
+    return LevelDBROTransaction.getChildrenView(parent, hdfsdb,
+                                                LevelDBROTransaction.OPTIONS);
   }
 
   @Override

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java

@@ -31,6 +31,10 @@ class MemDBChildrenView extends DBChildrenView {
 
   @Override
   public Iterator<Map.Entry<ByteBuffer, Long>> iterator() {
-    return childrenMap.tailMap(start).entrySet().iterator();
+    if (start == null) {
+      return childrenMap.entrySet().iterator();
+    } else {
+      return childrenMap.tailMap(start).entrySet().iterator();
+    }
   }
 }

+ 83 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java

@@ -0,0 +1,83 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.hdfsdb.*;
+import org.apache.log4j.Level;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * Created by hmai on 6/3/15.
+ */
+public class LevelDBProfile {
+  private static final String DB_PATH = "/Users/hmai/work/test/partialnsdb";
+  private static final int TIMES = 300000;
+  public static void main(String[] args) throws Exception {
+    MiniDFSCluster cluster = null;
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean("dfs.partialns", true);
+    conf.set("dfs.partialns.path", DB_PATH);
+    conf.setInt("dfs.partialns.writebuffer", 8388608 * 16);
+    conf.setLong("dfs.partialns.blockcache", 4294967296L);
+    ExecutorService executor = Executors.newFixedThreadPool(8, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "Executor");
+      }
+    });
+    ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
+
+    try {
+      FileUtils.deleteDirectory(new File(DB_PATH));
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      final DistributedFileSystem fs = cluster.getFileSystem();
+      final org.apache.hadoop.hdfs.hdfsdb.DB db = cluster.getNamesystem().getFSDirectory().getLevelDb();
+      final Path PATH = new Path("/foo");
+      final byte[] p = new byte[20];
+      try (OutputStream os = fs.create(PATH)) {
+      }
+      cluster.shutdownDataNodes();
+      final FSNamesystem fsn = cluster.getNamesystem();
+      final Runnable getFileStatus = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            fsn.getFileInfo("/foo", true);
+            //fs.getFileStatus(PATH);
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+      };
+
+      long start = monotonicNow();
+      for (int i = 0; i < TIMES; ++i) {
+        executor.submit(getFileStatus);
+      }
+      executor.shutdown();
+      executor.awaitTermination(1, TimeUnit.HOURS);
+      long end = monotonicNow();
+      System.err.println("Time: " + (end - start) + " ms");
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}

+ 22 - 1
hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java

@@ -21,7 +21,8 @@ import java.io.IOException;
 
 public class DB extends NativeObject {
   public static DB open(Options options, String path) throws IOException {
-    return new DB(open(options.nativeHandle(), path));
+    long handle = open(options.nativeHandle(), path);
+    return new DB(handle);
   }
 
   @Override
@@ -36,6 +37,11 @@ public class DB extends NativeObject {
     return get(nativeHandle, options.nativeHandle(), key);
   }
 
+  public byte[] snapshotGet(ReadOptions options, byte[] key) throws
+                                                             IOException {
+    return snapshotGet(nativeHandle, options.nativeHandle(), key);
+  }
+
   public void write(WriteOptions options, WriteBatch batch) throws IOException {
     write(nativeHandle, options.nativeHandle(), batch.nativeHandle());
   }
@@ -52,6 +58,14 @@ public class DB extends NativeObject {
     return new Iterator(newIterator(nativeHandle, options.nativeHandle()));
   }
 
+  public Snapshot snapshot() {
+    return new Snapshot(nativeHandle, newSnapshot(nativeHandle));
+  }
+
+  public byte[] dbGetTest(byte[] key) throws IOException {
+    return getTest(nativeHandle, key);
+  }
+
   private DB(long handle) {
     super(handle);
   }
@@ -60,6 +74,8 @@ public class DB extends NativeObject {
   private static native void close(long handle);
   private static native byte[] get(long handle, long options,
                                    byte[] key) throws IOException;
+  private static native byte[] snapshotGet(long handle, long options,
+      byte[] key) throws IOException;
   private static native void write(long handle, long options,
                                    long batch) throws IOException;
   private static native void put(long handle, long options,
@@ -67,4 +83,9 @@ public class DB extends NativeObject {
   private static native void delete(long handle, long options,
                                     byte[] key);
   private static native long newIterator(long handle, long readOptions);
+  private static native long newSnapshot(long handle);
+  static native void releaseSnapshot(long handle, long snapshotHandle);
+
+  private static native byte[] getTest(long handle, byte[] key) throws IOException;
+
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java

@@ -56,6 +56,11 @@ public class Options extends NativeObject {
     return this;
   }
 
+  public Options blockCacheSize(long capacity) {
+    blockCacheSize(nativeHandle, capacity);
+    return this;
+  }
+
   @Override
   public void close() {
     if (nativeHandle != 0) {
@@ -70,4 +75,5 @@ public class Options extends NativeObject {
   private static native void compressionType(long handle, int value);
   private static native void writeBufferSize(long handle, int value);
   private static native void blockSize(long handle, int value);
+  private static native void blockCacheSize(long handle, long capacity);
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java

@@ -22,6 +22,11 @@ public class ReadOptions extends NativeObject {
     super(construct());
   }
 
+  public ReadOptions snapshot(Snapshot snapshot) {
+    snapshot(nativeHandle, snapshot.nativeHandle);
+    return this;
+  }
+
   @Override
   public void close() {
     if (nativeHandle != 0) {
@@ -32,4 +37,5 @@ public class ReadOptions extends NativeObject {
 
   private static native long construct();
   private static native void destruct(long handle);
+  private static native void snapshot(long handle, long snapshot);
 }

+ 34 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.hdfsdb;
+
+public class Snapshot extends NativeObject {
+  private final long dbHandle;
+  Snapshot(long dbHandle, long nativeHandle) {
+    super(nativeHandle);
+    this.dbHandle = dbHandle;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (nativeHandle != 0) {
+      DB.releaseSnapshot(dbHandle, nativeHandle);
+      nativeHandle = 0;
+    }
+  }
+}

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc

@@ -1121,6 +1121,52 @@ Status DBImpl::Get(const ReadOptions& options,
   return s;
 }
 
+Status DBImpl::SnapshotGet(const ReadOptions& options,
+                           const Slice& key,
+                           const std::function<void(const Slice&)> &get_value) {
+  Status s;
+  assert(options.snapshot);
+  SequenceNumber snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
+  LookupKey lkey(key, snapshot);
+
+  //mutex_.Lock();
+  MemTable* mem = mem_;
+  MemTable* imm = imm_;
+  //mutex_.Unlock();
+  mem->Ref();
+  if (imm != NULL) imm->Ref();
+
+  // First look in the memtable, then in the immutable memtable (if any).
+  if (mem->Get(lkey, get_value, &s)) {
+    // Done
+  } else if (imm != NULL && imm->Get(lkey, get_value, &s)) {
+    // Done
+  } else {
+    assert (false);
+    mutex_.Lock();
+    Version* current = versions_->current();
+    current->Ref();
+    // Unlock while reading from files and memtables
+    mutex_.Unlock();
+    Version::GetStats stats;
+    std::string value;
+    s = current->Get(options, lkey, &value, &stats);
+    if (value.size()) {
+      get_value(Slice(value));
+    }
+    mutex_.Lock();
+    if (current->UpdateStats(stats)) {
+      MaybeScheduleCompaction();
+    }
+    current->Unref();
+    mutex_.Unlock();
+  }
+
+  mem->Unref();
+  if (imm != NULL) imm->Unref();
+  return s;
+}
+
 Iterator* DBImpl::NewIterator(const ReadOptions& options) {
   SequenceNumber latest_snapshot;
   uint32_t seed;

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h

@@ -35,6 +35,10 @@ class DBImpl : public DB {
   virtual Status Get(const ReadOptions& options,
                      const Slice& key,
                      std::string* value);
+  virtual Status SnapshotGet(const ReadOptions& options,
+                             const Slice& key,
+                             const std::function<void(const Slice&)>
+                             &get_value);
   virtual Iterator* NewIterator(const ReadOptions&);
   virtual const Snapshot* GetSnapshot();
   virtual void ReleaseSnapshot(const Snapshot* snapshot);

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc

@@ -1848,6 +1848,14 @@ class ModelDB: public DB {
     assert(false);      // Not implemented
     return Status::NotFound(key);
   }
+
+  virtual Status SnapshotGet(const ReadOptions& options,
+                             const Slice& key, const
+                             std::function<void(const Slice&)> &) {
+    assert(false);      // Not implemented
+    return Status::NotFound(key);
+  }
+
   virtual Iterator* NewIterator(const ReadOptions& options) {
     if (options.snapshot == NULL) {
       KVMap* saved = new KVMap;

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc

@@ -106,6 +106,12 @@ void MemTable::Add(SequenceNumber s, ValueType type,
 }
 
 bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
+  return Get(key, [value](const Slice &v) {
+          value->assign(v.data(), v.size()); }, s);
+}
+
+bool MemTable::Get(const LookupKey& key,
+ const std::function<void(const Slice&)> &get_value, Status* s) {
   Slice memkey = key.memtable_key();
   Table::Iterator iter(&table_);
   iter.Seek(memkey.data());
@@ -130,7 +136,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
       switch (static_cast<ValueType>(tag & 0xff)) {
         case kTypeValue: {
           Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
-          value->assign(v.data(), v.size());
+          get_value(v);
           return true;
         }
         case kTypeDeletion:

+ 9 - 4
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h

@@ -6,6 +6,9 @@
 #define STORAGE_LEVELDB_DB_MEMTABLE_H_
 
 #include <string>
+#include <functional>
+#include <atomic>
+
 #include "leveldb/db.h"
 #include "db/dbformat.h"
 #include "db/skiplist.h"
@@ -28,9 +31,9 @@ class MemTable {
 
   // Drop reference count.  Delete if no more references exist.
   void Unref() {
-    --refs_;
-    assert(refs_ >= 0);
-    if (refs_ <= 0) {
+    int v = std::atomic_fetch_sub(&refs_, 1);
+    assert(v >= 0);
+    if (v <= 0) {
       delete this;
     }
   }
@@ -62,6 +65,8 @@ class MemTable {
   // in *status and return true.
   // Else, return false.
   bool Get(const LookupKey& key, std::string* value, Status* s);
+  bool Get(const LookupKey& key, const std::function<void(const Slice&)>
+  &get_value, Status* s);
 
  private:
   ~MemTable();  // Private since only Unref() should be used to delete it
@@ -77,7 +82,7 @@ class MemTable {
   typedef SkipList<const char*, KeyComparator> Table;
 
   KeyComparator comparator_;
-  int refs_;
+  std::atomic_int refs_;
   Arena arena_;
   Table table_;
 

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h

@@ -7,6 +7,8 @@
 
 #include <stdint.h>
 #include <stdio.h>
+#include <functional>
+
 #include "leveldb/iterator.h"
 #include "leveldb/options.h"
 
@@ -83,6 +85,15 @@ class DB {
   virtual Status Get(const ReadOptions& options,
                      const Slice& key, std::string* value) = 0;
 
+  // Get the value from a particular snapshot. The call only blocks if
+  // the value resides in the block cache or on the disk.
+  //
+  // May return some other Status on an error.
+  virtual Status SnapshotGet(const ReadOptions& options,
+                             const Slice& key,
+                             const std::function<void(const Slice&)>
+                             &get_value) = 0;
+
   // Return a heap-allocated iterator over the contents of the database.
   // The result of NewIterator() is initially invalid (caller must
   // call one of the Seek methods on the iterator before using it).

+ 63 - 3
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 #include <jni.h>
-
+#include <mutex>
 #undef JNIEXPORT
 #if _WIN32
 #define JNIEXPORT __declspec(dllexport)
@@ -33,11 +33,12 @@
 #include "org_apache_hadoop_hdfs_hdfsdb_WriteOptions.h"
 
 #include <leveldb/db.h>
+#include <leveldb/cache.h>
 #include <leveldb/options.h>
 #include <leveldb/write_batch.h>
 #include <leveldb/cache.h>
 
-static inline uintptr_t uintptr(void *ptr) {
+static inline uintptr_t uintptr(const void *ptr) {
   return reinterpret_cast<uintptr_t>(ptr);
 }
 
@@ -130,6 +131,29 @@ jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_get(JNIEnv *env, jclass
   return ToJByteArray(env, leveldb::Slice(result));
 }
 
+jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_snapshotGet(JNIEnv *env, jclass, jlong handle, jlong jread_options, jbyteArray jkey) {
+  leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+  leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options);
+  jbyteArray res = NULL;
+  leveldb::Status status;
+  {
+    JNIByteArrayHolder<GetByteArrayCritical> key(env, jkey);
+    status = db->SnapshotGet(*options, key.slice(),
+     [env,&res](const leveldb::Slice &v) {
+      res = ToJByteArray(env, v);
+     });
+  }
+
+  if (status.IsNotFound()) {
+    return NULL;
+  } else if (!status.ok()) {
+    env->ThrowNew(env->FindClass("java/io/IOException"), status.ToString().c_str());
+    return NULL;
+  }
+
+  return res;
+}
+
 void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_write(JNIEnv *env, jclass, jlong handle, jlong jwrite_options, jlong jbatch) {
   leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
   leveldb::WriteOptions *options = reinterpret_cast<leveldb::WriteOptions*>(jwrite_options);
@@ -150,10 +174,33 @@ void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_delete(JNIEnv *env, jclass, j
 jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newIterator(JNIEnv *, jclass, jlong handle, jlong jread_options) {
   leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
   leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options);
-  auto res = uintptr(db->NewIterator(*options));
+  uintptr_t res = uintptr(db->NewIterator(*options));
   return res;
 }
 
+jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newSnapshot(JNIEnv *, jclass, jlong handle) {
+  leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+  uintptr_t res = uintptr(db->GetSnapshot());
+  return res;
+}
+
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_releaseSnapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) {
+  leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+  leveldb::Snapshot *s = reinterpret_cast<leveldb::Snapshot*>(snapshot);
+  db->ReleaseSnapshot(s);
+}
+
+static std::mutex mutex;
+jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_getTest(JNIEnv *env,
+jclass, jlong handle, jbyteArray jkey) {
+  mutex.lock();
+  JNIByteArrayHolder<GetByteArrayElements> key(env, jkey);
+  std::string result;
+  result.resize(100);
+  mutex.unlock();
+  return ToJByteArray(env, leveldb::Slice(result));
+}
+
 void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Iterator_destruct(JNIEnv *, jclass, jlong handle) {
   delete reinterpret_cast<leveldb::Iterator*>(handle);
 }
@@ -212,6 +259,14 @@ void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Options_blockSize(JNIEnv *, jcla
   options->block_size = value;
 }
 
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Options_blockCacheSize(JNIEnv *, jclass, jlong handle, jlong value) {
+  leveldb::Options *options = reinterpret_cast<leveldb::Options*>(handle);
+  if (options->block_cache) {
+    delete options->block_cache;
+  }
+  options->block_cache = leveldb::NewLRUCache(value);
+}
+
 jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_construct(JNIEnv *, jclass) {
   return uintptr(new leveldb::ReadOptions());
 }
@@ -220,6 +275,11 @@ void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_destruct(JNIEnv *, j
   delete reinterpret_cast<leveldb::ReadOptions*>(handle);
 }
 
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_snapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) {
+  leveldb::ReadOptions *o = reinterpret_cast<leveldb::ReadOptions*>(handle);
+  o->snapshot = reinterpret_cast<leveldb::Snapshot*>(snapshot);
+}
+
 jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_WriteOptions_construct(JNIEnv *, jclass) {
   return uintptr(new leveldb::WriteOptions());
 }