瀏覽代碼

HDDS-1391 : Add ability in OM to serve delta updates through an API. (#1033)

avijayanhwx 5 年之前
父節點
當前提交
60325c9611

+ 9 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java

@@ -184,4 +184,13 @@ public interface DBStore extends AutoCloseable {
    * @return codec registry.
    */
   CodecRegistry getCodecRegistry();
+
+  /**
+   * Get data written to DB since a specific sequence number.
+   * @param sequenceNumber
+   * @return
+   * @throws SequenceNumberNotFoundException
+   */
+  DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
+      throws SequenceNumberNotFoundException;
 }

+ 48 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java

@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.utils.db;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Wrapper class to hold DB data read from the RocksDB log file.
+ */
+public class DBUpdatesWrapper {
+
+  private List<byte[]> dataList = new ArrayList<>();
+  private long currentSequenceNumber = -1;
+
+  public void addWriteBatch(byte[] data, long sequenceNumber) {
+    dataList.add(data);
+    if (currentSequenceNumber < sequenceNumber) {
+      currentSequenceNumber = sequenceNumber;
+    }
+  }
+
+  public List<byte[]> getData() {
+    return dataList;
+  }
+
+  public long getCurrentSequenceNumber() {
+    return currentSequenceNumber;
+  }
+}
+

+ 46 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java

@@ -46,6 +46,7 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.FlushOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.TransactionLogIterator;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -327,6 +328,51 @@ public class RDBStore implements DBStore {
     return codecRegistry;
   }
 
+  @Override
+  public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
+      throws SequenceNumberNotFoundException {
+
+    DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
+    try {
+      TransactionLogIterator transactionLogIterator =
+          db.getUpdatesSince(sequenceNumber);
+
+      // Only the first record needs to be checked if its seq number <
+      // ( 1 + passed_in_sequence_number). For example, if seqNumber passed
+      // in is 100, then we can read from the WAL ONLY if the first sequence
+      // number is <= 101. If it is 102, then 101 may already be flushed to
+      // SST. If it 99, we can skip 99 and 100, and then read from 101.
+
+      boolean checkValidStartingSeqNumber = true;
+
+      while (transactionLogIterator.isValid()) {
+        TransactionLogIterator.BatchResult result =
+            transactionLogIterator.getBatch();
+        long currSequenceNumber = result.sequenceNumber();
+        if (checkValidStartingSeqNumber &&
+            currSequenceNumber > 1 + sequenceNumber) {
+          throw new SequenceNumberNotFoundException("Unable to read data from" +
+              " RocksDB wal to get delta updates. It may have already been" +
+              "flushed to SSTs.");
+        }
+        // If the above condition was not satisfied, then it is OK to reset
+        // the flag.
+        checkValidStartingSeqNumber = false;
+        if (currSequenceNumber <= sequenceNumber) {
+          transactionLogIterator.next();
+          continue;
+        }
+        dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
+            result.sequenceNumber());
+        transactionLogIterator.next();
+      }
+    } catch (RocksDBException e) {
+      LOG.error("Unable to get delta updates since sequenceNumber {} ",
+          sequenceNumber, e);
+    }
+    return dbUpdatesWrapper;
+  }
+
   @VisibleForTesting
   public RocksDB getDb() {
     return db;

+ 37 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/SequenceNumberNotFoundException.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.utils.db;
+
+import java.io.IOException;
+
+/**
+ * Thrown if RocksDB is unable to find requested data from WAL file.
+ */
+public class SequenceNumberNotFoundException extends IOException {
+
+  public SequenceNumberNotFoundException() {
+    super();
+  }
+
+  public SequenceNumberNotFoundException(String message) {
+    super(message);
+  }
+
+}

+ 21 - 0
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java

@@ -325,4 +325,25 @@ public class TestRDBStore {
     }
   }
 
+  @Test
+  public void testGetDBUpdatesSince() throws Exception {
+
+    try (RDBStore newStore =
+             new RDBStore(folder.newFolder(), options, configSet)) {
+
+      try (Table firstTable = newStore.getTable(families.get(1))) {
+        firstTable.put(StringUtils.getBytesUtf16("Key1"), StringUtils
+            .getBytesUtf16("Value1"));
+        firstTable.put(StringUtils.getBytesUtf16("Key2"), StringUtils
+            .getBytesUtf16("Value2"));
+      }
+      Assert.assertTrue(
+          newStore.getDb().getLatestSequenceNumber() == 2);
+
+      DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0);
+      Assert.assertEquals(2, dbUpdatesSince.getData().size());
+    }
+  }
+
+
 }

+ 1 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java

@@ -199,6 +199,7 @@ public final class OmUtils {
     case LookupFile:
     case ListStatus:
     case GetAcl:
+    case DBUpdates:
       return true;
     case CreateVolume:
     case SetVolumeProperty:

+ 12 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -73,6 +73,7 @@ enum Type {
   ListMultiPartUploadParts = 50;
 
   ServiceList = 51;
+  DBUpdates = 53;
 
   GetDelegationToken = 61;
   RenewDelegationToken = 62;
@@ -136,6 +137,7 @@ message OMRequest {
   optional MultipartUploadListPartsRequest  listMultipartUploadPartsRequest = 50;
 
   optional ServiceListRequest               serviceListRequest             = 51;
+  optional DBUpdatesRequest                  dbUpdatesRequest              = 53;
 
   optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
   optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
@@ -202,6 +204,7 @@ message OMResponse {
   optional MultipartUploadListPartsResponse listMultipartUploadPartsResponse = 50;
 
   optional ServiceListResponse               ServiceListResponse           = 51;
+  optional DBUpdatesResponse                 dbUpdatesResponse             = 52;
 
   optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
   optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
@@ -836,11 +839,20 @@ message AllocateBlockResponse {
 message ServiceListRequest {
 }
 
+message DBUpdatesRequest {
+    required uint64 sequenceNumber = 1;
+}
+
 message ServiceListResponse {
 
     repeated ServiceInfo serviceInfo = 2;
 }
 
+message DBUpdatesResponse {
+    required uint64 sequenceNumber = 1;
+    repeated bytes data = 2;
+}
+
 message ServicePort {
     enum Type {
         RPC = 1;

+ 37 - 12
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
@@ -81,6 +82,7 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
 import org.apache.hadoop.utils.db.RDBStore;
 import org.apache.hadoop.utils.db.Table;
 import org.apache.hadoop.utils.db.Table.KeyValue;
@@ -1395,8 +1397,41 @@ public class TestOzoneManager {
     RDBStore rdbStore = (RDBStore) cluster.getOzoneManager()
         .getMetadataManager().getStore();
     RocksDB db = rdbStore.getDb();
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
+    OmKeyInfo keyInfo = getNewOmKeyInfo();
+    OmKeyInfoCodec omKeyInfoCodec = new OmKeyInfoCodec();
+
+    db.put(StringUtils.getBytesUtf16("OMKey1"),
+        omKeyInfoCodec.toPersistedFormat(keyInfo));
+
+    StringBuilder sb = new StringBuilder();
+    Assert.assertTrue(db.keyMayExist(StringUtils.getBytesUtf16("OMKey1"),
+        sb));
+    Assert.assertTrue(sb.length() > 0);
+  }
+
+
+  @Test
+  public void testGetOMDBUpdates() throws IOException {
+
+    DBUpdatesRequest dbUpdatesRequest =
+        DBUpdatesRequest.newBuilder().setSequenceNumber(0).build();
+
+    DBUpdatesWrapper dbUpdates =
+        cluster.getOzoneManager().getDBUpdates(dbUpdatesRequest);
+    Assert.assertTrue(dbUpdates.getData().isEmpty());
+
+    //Write data to OM.
+    OmKeyInfo keyInfo = getNewOmKeyInfo();
+    Assert.assertNotNull(keyInfo);
+    dbUpdates =
+        cluster.getOzoneManager().getDBUpdates(dbUpdatesRequest);
+    Assert.assertFalse(dbUpdates.getData().isEmpty());
+
+  }
+
+  private OmKeyInfo getNewOmKeyInfo() throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
         .setVolume("vol1")
         .setAdminName("bilbo")
@@ -1423,16 +1458,6 @@ public class TestOzoneManager {
         .build();
     OpenKeySession keySession = cluster.getOzoneManager().getKeyManager()
         .openKey(keyArgs);
-    OmKeyInfo keyInfo = keySession.getKeyInfo();
-    OmKeyInfoCodec omKeyInfoCodec = new OmKeyInfoCodec();
-
-    db.put(StringUtils.getBytesUtf16("OMKey1"),
-        omKeyInfoCodec.toPersistedFormat(keyInfo));
-
-    StringBuilder sb = new StringBuilder();
-    Assert.assertTrue(db.keyMayExist(StringUtils.getBytesUtf16("OMKey1"),
-        sb));
-    Assert.assertTrue(sb.length() > 0);
+    return  keySession.getKeyInfo();
   }
-
 }

+ 17 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -81,6 +81,7 @@ import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.security.OzoneSecurityException;
@@ -144,6 +145,8 @@ import org.apache.hadoop.util.KMSUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.utils.RetriableTask;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.SequenceNumberNotFoundException;
 import org.apache.hadoop.utils.db.DBCheckpoint;
 import org.apache.hadoop.utils.db.DBStore;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -3346,4 +3349,18 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public boolean isRatisEnabled() {
     return isRatisEnabled;
   }
+
+  /**
+   * Get DB updates since a specific sequence number.
+   * @param dbUpdatesRequest request that encapsulates a sequence number.
+   * @return Wrapper containing the updates.
+   * @throws SequenceNumberNotFoundException if db is unable to read the data.
+   */
+  public DBUpdatesWrapper getDBUpdates(
+      DBUpdatesRequest dbUpdatesRequest)
+      throws SequenceNumberNotFoundException {
+    return metadataManager.getStore()
+        .getUpdatesSince(dbUpdatesRequest.getSequenceNumber());
+
+  }
 }

+ 23 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java

@@ -122,6 +122,9 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.Lists;
+
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.SequenceNumberNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -298,6 +301,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
             request.getServiceListRequest());
         responseBuilder.setServiceListResponse(serviceListResponse);
         break;
+      case DBUpdates:
+        DBUpdatesResponse dbUpdatesResponse = getOMDBUpdates(
+            request.getDbUpdatesRequest());
+        responseBuilder.setDbUpdatesResponse(dbUpdatesResponse);
+        break;
       case GetDelegationToken:
         GetDelegationTokenResponseProto getDtResp = getDelegationToken(
             request.getGetDelegationTokenRequest());
@@ -377,6 +385,21 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     return responseBuilder.build();
   }
 
+  private DBUpdatesResponse getOMDBUpdates(
+      DBUpdatesRequest dbUpdatesRequest)
+      throws SequenceNumberNotFoundException {
+
+    DBUpdatesResponse.Builder builder = DBUpdatesResponse
+        .newBuilder();
+    DBUpdatesWrapper dbUpdatesWrapper =
+        impl.getDBUpdates(dbUpdatesRequest);
+    for (int i = 0; i < dbUpdatesWrapper.getData().size(); i++) {
+      builder.setData(i,
+          OMPBHelper.getByteString(dbUpdatesWrapper.getData().get(i)));
+    }
+    return builder.build();
+  }
+
   private GetAclResponse getAcl(GetAclRequest req) throws IOException {
     List<OzoneAclInfo> acls = new ArrayList<>();