|
@@ -17,27 +17,34 @@
|
|
|
package org.apache.hadoop.ozone.om;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
+import com.google.protobuf.ServiceException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
|
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
|
|
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.utils.BackgroundService;
|
|
|
import org.apache.hadoop.utils.BackgroundTask;
|
|
|
import org.apache.hadoop.utils.BackgroundTaskQueue;
|
|
|
import org.apache.hadoop.utils.BackgroundTaskResult;
|
|
|
import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
|
|
|
-import org.apache.hadoop.utils.db.BatchOperation;
|
|
|
-import org.apache.hadoop.utils.db.DBStore;
|
|
|
-import org.apache.hadoop.utils.db.Table;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
|
|
|
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
|
|
|
+
|
|
|
+import org.apache.hadoop.utils.db.BatchOperation;
|
|
|
+import org.apache.hadoop.utils.db.DBStore;
|
|
|
+import org.apache.hadoop.utils.db.Table;
|
|
|
+import org.apache.ratis.protocol.ClientId;
|
|
|
import org.rocksdb.RocksDBException;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -55,17 +62,21 @@ public class KeyDeletingService extends BackgroundService {
|
|
|
// The thread pool size for key deleting service.
|
|
|
private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
|
|
|
|
|
|
+ private final OzoneManager ozoneManager;
|
|
|
private final ScmBlockLocationProtocol scmClient;
|
|
|
private final KeyManager manager;
|
|
|
+ private ClientId clientId = ClientId.randomId();
|
|
|
private final int keyLimitPerTask;
|
|
|
private final AtomicLong deletedKeyCount;
|
|
|
private final AtomicLong runCount;
|
|
|
|
|
|
- public KeyDeletingService(ScmBlockLocationProtocol scmClient,
|
|
|
+ KeyDeletingService(OzoneManager ozoneManager,
|
|
|
+ ScmBlockLocationProtocol scmClient,
|
|
|
KeyManager manager, long serviceInterval,
|
|
|
long serviceTimeout, Configuration conf) {
|
|
|
super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
|
|
|
KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
|
|
|
+ this.ozoneManager = ozoneManager;
|
|
|
this.scmClient = scmClient;
|
|
|
this.manager = manager;
|
|
|
this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
|
|
@@ -101,6 +112,21 @@ public class KeyDeletingService extends BackgroundService {
|
|
|
return queue;
|
|
|
}
|
|
|
|
|
|
+ private boolean shouldRun() {
|
|
|
+ if (ozoneManager == null) {
|
|
|
+ // OzoneManager can be null for testing
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return ozoneManager.isLeader();
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isRatisEnabled() {
|
|
|
+ if (ozoneManager == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return ozoneManager.isRatisEnabled();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* A key deleting task scans OM DB and looking for a certain number of
|
|
|
* pending-deletion keys, sends these keys along with their associated blocks
|
|
@@ -118,26 +144,38 @@ public class KeyDeletingService extends BackgroundService {
|
|
|
|
|
|
@Override
|
|
|
public BackgroundTaskResult call() throws Exception {
|
|
|
- runCount.incrementAndGet();
|
|
|
- try {
|
|
|
- long startTime = Time.monotonicNow();
|
|
|
- List<BlockGroup> keyBlocksList = manager
|
|
|
- .getPendingDeletionKeys(keyLimitPerTask);
|
|
|
- if (keyBlocksList != null && keyBlocksList.size() > 0) {
|
|
|
- List<DeleteBlockGroupResult> results =
|
|
|
- scmClient.deleteKeyBlocks(keyBlocksList);
|
|
|
- if (results != null) {
|
|
|
- int delCount = deleteAllKeys(results);
|
|
|
- LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
|
|
|
- delCount, Time.monotonicNow() - startTime);
|
|
|
- deletedKeyCount.addAndGet(delCount);
|
|
|
+ // Check if this is the Leader OM. If not leader, no need to execute this
|
|
|
+ // task.
|
|
|
+ if (shouldRun()) {
|
|
|
+ runCount.incrementAndGet();
|
|
|
+ try {
|
|
|
+ long startTime = Time.monotonicNow();
|
|
|
+ List<BlockGroup> keyBlocksList = manager
|
|
|
+ .getPendingDeletionKeys(keyLimitPerTask);
|
|
|
+ if (keyBlocksList != null && keyBlocksList.size() > 0) {
|
|
|
+ List<DeleteBlockGroupResult> results =
|
|
|
+ scmClient.deleteKeyBlocks(keyBlocksList);
|
|
|
+ if (results != null) {
|
|
|
+ int delCount;
|
|
|
+ if (isRatisEnabled()) {
|
|
|
+ delCount = submitPurgeKeysRequest(results);
|
|
|
+ } else {
|
|
|
+ // TODO: Once HA and non-HA paths are merged, we should have
|
|
|
+ // only one code path here. Purge keys should go through an
|
|
|
+ // OMRequest model.
|
|
|
+ delCount = deleteAllKeys(results);
|
|
|
+ }
|
|
|
+ LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
|
|
|
+ delCount, Time.monotonicNow() - startTime);
|
|
|
+ deletedKeyCount.addAndGet(delCount);
|
|
|
+ }
|
|
|
}
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error while running delete keys background task. Will " +
|
|
|
+ "retry at next run.", e);
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Error while running delete keys background task. Will " +
|
|
|
- "retry at next run.", e);
|
|
|
}
|
|
|
- // By desing, no one cares about the results of this call back.
|
|
|
+ // By design, no one cares about the results of this call back.
|
|
|
return EmptyTaskResult.newResult();
|
|
|
}
|
|
|
|
|
@@ -171,5 +209,48 @@ public class KeyDeletingService extends BackgroundService {
|
|
|
}
|
|
|
return deletedCount;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Submits PurgeKeys request for the keys whose blocks have been deleted
|
|
|
+ * by SCM.
|
|
|
+ *
|
|
|
+ * @param results DeleteBlockGroups returned by SCM.
|
|
|
+ * @throws IOException on Error
|
|
|
+ */
|
|
|
+ public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results) {
|
|
|
+ List<String> purgeKeysList = new ArrayList<>();
|
|
|
+
|
|
|
+ // Put all keys to be purged in a list
|
|
|
+ int deletedCount = 0;
|
|
|
+ for (DeleteBlockGroupResult result : results) {
|
|
|
+ if (result.isSuccess()) {
|
|
|
+ // Add key to PurgeKeys list.
|
|
|
+ String deletedKey = result.getObjectKey();
|
|
|
+ purgeKeysList.add(deletedKey);
|
|
|
+ LOG.debug("Key {} set to be purged from OM DB", deletedKey);
|
|
|
+ deletedCount++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder()
|
|
|
+ .addAllKeys(purgeKeysList)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ OMRequest omRequest = OMRequest.newBuilder()
|
|
|
+ .setCmdType(Type.PurgeKeys)
|
|
|
+ .setPurgeKeysRequest(purgeKeysRequest)
|
|
|
+ .setClientId(clientId.toString())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ // Submit PurgeKeys request to OM
|
|
|
+ try {
|
|
|
+ ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
|
|
|
+ } catch (ServiceException e) {
|
|
|
+ LOG.error("PurgeKey request failed. Will retry at next run.");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ return deletedCount;
|
|
|
+ }
|
|
|
}
|
|
|
}
|