Преглед изворни кода

[partial-ns] Add skip_wal option to HDFSDB.

Haohui Mai пре 10 година
родитељ
комит
c2cf9bc97f

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/WriteOptions.java

@@ -27,6 +27,11 @@ public class WriteOptions extends NativeObject {
     return this;
   }
 
+  public WriteOptions skipWal(boolean value) {
+    skipWal(nativeHandle, value);
+    return this;
+  }
+
   @Override
   public void close() {
     if (nativeHandle != 0) {
@@ -38,4 +43,5 @@ public class WriteOptions extends NativeObject {
   private static native long construct();
   private static native void destruct(long handle);
   private static native void sync(long handle, boolean value);
+  private static native void skipWal(long handle, boolean value);
 }

+ 26 - 22
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc

@@ -1183,29 +1183,33 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
     last_sequence += WriteBatchInternal::Count(updates);
 
-    // Add to log and apply to memtable.  We can release the lock
-    // during this phase since &w is currently responsible for logging
-    // and protects against concurrent loggers and concurrent writes
-    // into mem_.
-    {
-      mutex_.Unlock();
-      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
-      bool sync_error = false;
-      if (status.ok() && options.sync) {
-        status = logfile_->Sync();
-        if (!status.ok()) {
-          sync_error = true;
+    if (options.skip_wal) {
+      status = WriteBatchInternal::InsertInto(updates, mem_);
+    } else {
+      // Add to log and apply to memtable.  We can release the lock
+      // during this phase since &w is currently responsible for logging
+      // and protects against concurrent loggers and concurrent writes
+      // into mem_.
+      {
+        mutex_.Unlock();
+        status = log_->AddRecord(WriteBatchInternal::Contents(updates));
+        bool sync_error = false;
+        if (status.ok() && options.sync) {
+          status = logfile_->Sync();
+          if (!status.ok()) {
+            sync_error = true;
+          }
+        }
+        if (status.ok()) {
+          status = WriteBatchInternal::InsertInto(updates, mem_);
+        }
+        mutex_.Lock();
+        if (sync_error) {
+          // The state of the log file is indeterminate: the log record we
+          // just added may or may not show up when the DB is re-opened.
+          // So we force the DB into a mode where all future writes fail.
+          RecordBackgroundError(status);
         }
-      }
-      if (status.ok()) {
-        status = WriteBatchInternal::InsertInto(updates, mem_);
-      }
-      mutex_.Lock();
-      if (sync_error) {
-        // The state of the log file is indeterminate: the log record we
-        // just added may or may not show up when the DB is re-opened.
-        // So we force the DB into a mode where all future writes fail.
-        RecordBackgroundError(status);
       }
     }
     if (updates == tmp_batch_) tmp_batch_->Clear();

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/options.h

@@ -184,9 +184,12 @@ struct WriteOptions {
   //
   // Default: false
   bool sync;
+  // If true, the write will not write the write-ahead log.
+  bool skip_wal;
 
   WriteOptions()
-      : sync(false) {
+      : sync(false)
+      , skip_wal(false) {
   }
 };
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc

@@ -229,6 +229,11 @@ void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_WriteOptions_sync(JNIEnv *, jcla
   options->sync = value;
 }
 
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_WriteOptions_skipWal(JNIEnv *, jclass, jlong handle, jboolean value) {
+  leveldb::WriteOptions *options = reinterpret_cast<leveldb::WriteOptions*>(handle);
+  options->skip_wal = value;
+}
+
 void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_WriteOptions_destruct(JNIEnv *, jclass, jlong handle) {
   delete reinterpret_cast<leveldb::WriteOptions*>(handle);
 }