|
@@ -16,14 +16,16 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.ozone.scm.block;
|
|
package org.apache.hadoop.ozone.scm.block;
|
|
|
|
|
|
-import com.google.common.collect.Lists;
|
|
|
|
-import com.google.common.collect.Maps;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
|
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
|
|
|
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
|
import org.apache.hadoop.ozone.scm.container.Mapping;
|
|
import org.apache.hadoop.ozone.scm.container.Mapping;
|
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.utils.BackgroundService;
|
|
import org.apache.hadoop.utils.BackgroundService;
|
|
import org.apache.hadoop.utils.BackgroundTask;
|
|
import org.apache.hadoop.utils.BackgroundTask;
|
|
@@ -32,13 +34,12 @@ import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
|
|
|
|
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
|
|
|
|
+
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.util.Collections;
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Set;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* A background service running in SCM to delete blocks. This service scans
|
|
* A background service running in SCM to delete blocks. This service scans
|
|
@@ -49,7 +50,7 @@ import java.util.stream.Collectors;
|
|
*/
|
|
*/
|
|
public class SCMBlockDeletingService extends BackgroundService {
|
|
public class SCMBlockDeletingService extends BackgroundService {
|
|
|
|
|
|
- private static final Logger LOG =
|
|
|
|
|
|
+ static final Logger LOG =
|
|
LoggerFactory.getLogger(SCMBlockDeletingService.class);
|
|
LoggerFactory.getLogger(SCMBlockDeletingService.class);
|
|
|
|
|
|
// ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
|
|
// ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
|
|
@@ -58,28 +59,36 @@ public class SCMBlockDeletingService extends BackgroundService {
|
|
private final Mapping mappingService;
|
|
private final Mapping mappingService;
|
|
private final NodeManager nodeManager;
|
|
private final NodeManager nodeManager;
|
|
|
|
|
|
- // Default container size is 5G and block size is 256MB, a full container
|
|
|
|
- // at most contains 20 blocks. At most each TX contains 20 blocks.
|
|
|
|
- // When SCM sends block deletion TXs to datanode, each command we allow
|
|
|
|
- // at most 50 containers so that will limit number of to be deleted blocks
|
|
|
|
- // less than 1000.
|
|
|
|
- // TODO - a better throttle algorithm
|
|
|
|
- // Note, this is not an accurate limit of blocks. When we scan
|
|
|
|
- // the log, worst case we may get 50 TX for 50 different datanodes,
|
|
|
|
- // that will cause the deletion message sent by SCM extremely small.
|
|
|
|
- // As a result, the deletion will be slow. An improvement is to scan
|
|
|
|
- // log multiple times until we get enough TXs for each datanode, or
|
|
|
|
- // the entire log is scanned.
|
|
|
|
- private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50;
|
|
|
|
|
|
+ // Block delete limit size is dynamically calculated based on container
|
|
|
|
+ // delete limit size (ozone.block.deleting.container.limit.per.interval)
|
|
|
|
+ // that configured for datanode. To ensure DN not wait for
|
|
|
|
+ // delete commands, we use this value multiply by a factor 2 as the final
|
|
|
|
+ // limit TX size for each node.
|
|
|
|
+ // Currently we implement a throttle algorithm that throttling delete blocks
|
|
|
|
+ // for each datanode. Each node is limited by the calculation size. Firstly
|
|
|
|
+ // current node info is fetched from nodemanager, then scan entire delLog
|
|
|
|
+ // from the beginning to end. If one node reaches maximum value, its records
|
|
|
|
+ // will be skipped. If not, keep scanning until it reaches maximum value.
|
|
|
|
+ // Once all node are full, the scan behavior will stop.
|
|
|
|
+ private int blockDeleteLimitSize;
|
|
|
|
|
|
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
|
|
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
|
|
Mapping mapper, NodeManager nodeManager,
|
|
Mapping mapper, NodeManager nodeManager,
|
|
- int interval, long serviceTimeout) {
|
|
|
|
|
|
+ int interval, long serviceTimeout, Configuration conf) {
|
|
super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
|
|
super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
|
|
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
|
|
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
|
|
this.deletedBlockLog = deletedBlockLog;
|
|
this.deletedBlockLog = deletedBlockLog;
|
|
this.mappingService = mapper;
|
|
this.mappingService = mapper;
|
|
this.nodeManager = nodeManager;
|
|
this.nodeManager = nodeManager;
|
|
|
|
+
|
|
|
|
+ int containerLimit = conf.getInt(
|
|
|
|
+ OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
|
|
|
|
+ OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
|
|
|
|
+ Preconditions.checkArgument(containerLimit > 0,
|
|
|
|
+ "Container limit size should be " + "positive.");
|
|
|
|
+ // Use container limit value multiply by a factor 2 to ensure DN
|
|
|
|
+ // not wait for orders.
|
|
|
|
+ this.blockDeleteLimitSize = containerLimit * 2;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -104,126 +113,60 @@ public class SCMBlockDeletingService extends BackgroundService {
|
|
// Scan SCM DB in HB interval and collect a throttled list of
|
|
// Scan SCM DB in HB interval and collect a throttled list of
|
|
// to delete blocks.
|
|
// to delete blocks.
|
|
LOG.debug("Running DeletedBlockTransactionScanner");
|
|
LOG.debug("Running DeletedBlockTransactionScanner");
|
|
- DatanodeDeletedBlockTransactions transactions =
|
|
|
|
- getToDeleteContainerBlocks();
|
|
|
|
|
|
+ DatanodeDeletedBlockTransactions transactions = null;
|
|
|
|
+ List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
|
|
|
|
+ if (datanodes != null) {
|
|
|
|
+ transactions = new DatanodeDeletedBlockTransactions(mappingService,
|
|
|
|
+ blockDeleteLimitSize, datanodes.size());
|
|
|
|
+ try {
|
|
|
|
+ deletedBlockLog.getTransactions(transactions);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ // We may tolerant a number of failures for sometime
|
|
|
|
+ // but if it continues to fail, at some point we need to raise
|
|
|
|
+ // an exception and probably fail the SCM ? At present, it simply
|
|
|
|
+ // continues to retry the scanning.
|
|
|
|
+ LOG.error("Failed to get block deletion transactions from delTX log",
|
|
|
|
+ e);
|
|
|
|
+ }
|
|
|
|
+ LOG.debug("Scanned deleted blocks log and got {} delTX to process.",
|
|
|
|
+ transactions.getTXNum());
|
|
|
|
+ }
|
|
|
|
+
|
|
if (transactions != null && !transactions.isEmpty()) {
|
|
if (transactions != null && !transactions.isEmpty()) {
|
|
for (DatanodeID datanodeID : transactions.getDatanodes()) {
|
|
for (DatanodeID datanodeID : transactions.getDatanodes()) {
|
|
List<DeletedBlocksTransaction> dnTXs = transactions
|
|
List<DeletedBlocksTransaction> dnTXs = transactions
|
|
.getDatanodeTransactions(datanodeID);
|
|
.getDatanodeTransactions(datanodeID);
|
|
- dnTxCount += dnTXs.size();
|
|
|
|
- // TODO commandQueue needs a cap.
|
|
|
|
- // We should stop caching new commands if num of un-processed
|
|
|
|
- // command is bigger than a limit, e.g 50. In case datanode goes
|
|
|
|
- // offline for sometime, the cached commands be flooded.
|
|
|
|
- nodeManager.addDatanodeCommand(datanodeID,
|
|
|
|
- new DeleteBlocksCommand(dnTXs));
|
|
|
|
- LOG.debug(
|
|
|
|
- "Added delete block command for datanode {} in the queue,"
|
|
|
|
- + " number of delete block transactions: {}, TxID list: {}",
|
|
|
|
- datanodeID, dnTXs.size(),
|
|
|
|
- String.join(",", transactions.getTransactionIDList(datanodeID)));
|
|
|
|
|
|
+ if (dnTXs != null && !dnTXs.isEmpty()) {
|
|
|
|
+ dnTxCount += dnTXs.size();
|
|
|
|
+ // TODO commandQueue needs a cap.
|
|
|
|
+ // We should stop caching new commands if num of un-processed
|
|
|
|
+ // command is bigger than a limit, e.g 50. In case datanode goes
|
|
|
|
+ // offline for sometime, the cached commands be flooded.
|
|
|
|
+ nodeManager.addDatanodeCommand(datanodeID,
|
|
|
|
+ new DeleteBlocksCommand(dnTXs));
|
|
|
|
+ LOG.debug(
|
|
|
|
+ "Added delete block command for datanode {} in the queue,"
|
|
|
|
+ + " number of delete block transactions: {}, TxID list: {}",
|
|
|
|
+ datanodeID, dnTXs.size(), String.join(",",
|
|
|
|
+ transactions.getTransactionIDList(datanodeID)));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
if (dnTxCount > 0) {
|
|
if (dnTxCount > 0) {
|
|
- LOG.info("Totally added {} delete blocks command for"
|
|
|
|
- + " {} datanodes, task elapsed time: {}ms",
|
|
|
|
|
|
+ LOG.info(
|
|
|
|
+ "Totally added {} delete blocks command for"
|
|
|
|
+ + " {} datanodes, task elapsed time: {}ms",
|
|
dnTxCount, transactions.getDatanodes().size(),
|
|
dnTxCount, transactions.getDatanodes().size(),
|
|
Time.monotonicNow() - startTime);
|
|
Time.monotonicNow() - startTime);
|
|
}
|
|
}
|
|
|
|
|
|
return EmptyTaskResult.newResult();
|
|
return EmptyTaskResult.newResult();
|
|
}
|
|
}
|
|
-
|
|
|
|
- // Scan deleteBlocks.db to get a number of to-delete blocks.
|
|
|
|
- // this is going to be properly throttled.
|
|
|
|
- private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() {
|
|
|
|
- DatanodeDeletedBlockTransactions dnTXs =
|
|
|
|
- new DatanodeDeletedBlockTransactions();
|
|
|
|
- List<DeletedBlocksTransaction> txs = null;
|
|
|
|
- try {
|
|
|
|
- // Get a limited number of TXs to send via HB at a time.
|
|
|
|
- txs = deletedBlockLog
|
|
|
|
- .getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT);
|
|
|
|
- LOG.debug("Scanned deleted blocks log and got {} delTX to process",
|
|
|
|
- txs.size());
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- // We may tolerant a number of failures for sometime
|
|
|
|
- // but if it continues to fail, at some point we need to raise
|
|
|
|
- // an exception and probably fail the SCM ? At present, it simply
|
|
|
|
- // continues to retry the scanning.
|
|
|
|
- LOG.error("Failed to get block deletion transactions from delTX log",
|
|
|
|
- e);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (txs != null) {
|
|
|
|
- for (DeletedBlocksTransaction tx : txs) {
|
|
|
|
- try {
|
|
|
|
- ContainerInfo info = mappingService
|
|
|
|
- .getContainer(tx.getContainerName());
|
|
|
|
- // Find out the datanode where this TX is supposed to send to.
|
|
|
|
- info.getPipeline().getMachines()
|
|
|
|
- .forEach(entry -> dnTXs.addTransaction(entry, tx));
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Container {} not found, continue to process next",
|
|
|
|
- tx.getContainerName(), e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return dnTXs;
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * A wrapper class to hold info about datanode and all deleted block
|
|
|
|
- * transactions that will be sent to this datanode.
|
|
|
|
- */
|
|
|
|
- private static class DatanodeDeletedBlockTransactions {
|
|
|
|
-
|
|
|
|
- // A list of TXs mapped to a certain datanode ID.
|
|
|
|
- private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions;
|
|
|
|
-
|
|
|
|
- DatanodeDeletedBlockTransactions() {
|
|
|
|
- this.transactions = Maps.newHashMap();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) {
|
|
|
|
- if (transactions.containsKey(dnID)) {
|
|
|
|
- transactions.get(dnID).add(tx);
|
|
|
|
- } else {
|
|
|
|
- List<DeletedBlocksTransaction> first = Lists.newArrayList();
|
|
|
|
- first.add(tx);
|
|
|
|
- transactions.put(dnID, first);
|
|
|
|
- }
|
|
|
|
- LOG.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Set<DatanodeID> getDatanodes() {
|
|
|
|
- return transactions.keySet();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- boolean isEmpty() {
|
|
|
|
- return transactions.isEmpty();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- boolean hasTransactions(DatanodeID dnID) {
|
|
|
|
- return transactions.containsKey(dnID) &&
|
|
|
|
- !transactions.get(dnID).isEmpty();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) {
|
|
|
|
- return transactions.get(dnID);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- List<String> getTransactionIDList(DatanodeID dnID) {
|
|
|
|
- if (hasTransactions(dnID)) {
|
|
|
|
- return transactions.get(dnID).stream()
|
|
|
|
- .map(DeletedBlocksTransaction::getTxID)
|
|
|
|
- .map(String::valueOf)
|
|
|
|
- .collect(Collectors.toList());
|
|
|
|
- } else {
|
|
|
|
- return Collections.emptyList();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public void setBlockDeleteTXNum(int numTXs) {
|
|
|
|
+ blockDeleteLimitSize = numTXs;
|
|
}
|
|
}
|
|
}
|
|
}
|