|
@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
|
|
|
import org.apache.hadoop.ozone.scm.container.Mapping;
|
|
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
|
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.utils.BackgroundService;
|
|
|
import org.apache.hadoop.utils.BackgroundTask;
|
|
|
import org.apache.hadoop.utils.BackgroundTaskQueue;
|
|
@@ -98,28 +99,39 @@ public class SCMBlockDeletingService extends BackgroundService {
|
|
|
|
|
|
@Override
|
|
|
public EmptyTaskResult call() throws Exception {
|
|
|
+ int dnTxCount = 0;
|
|
|
+ long startTime = Time.monotonicNow();
|
|
|
// Scan SCM DB in HB interval and collect a throttled list of
|
|
|
// to delete blocks.
|
|
|
LOG.debug("Running DeletedBlockTransactionScanner");
|
|
|
DatanodeDeletedBlockTransactions transactions =
|
|
|
getToDeleteContainerBlocks();
|
|
|
if (transactions != null && !transactions.isEmpty()) {
|
|
|
- transactions.getDatanodes().forEach(datanodeID -> {
|
|
|
- List<DeletedBlocksTransaction> dnTXs =
|
|
|
- transactions.getDatanodeTransactions(datanodeID);
|
|
|
+ for (DatanodeID datanodeID : transactions.getDatanodes()) {
|
|
|
+ List<DeletedBlocksTransaction> dnTXs = transactions
|
|
|
+ .getDatanodeTransactions(datanodeID);
|
|
|
+ dnTxCount += dnTXs.size();
|
|
|
// TODO commandQueue needs a cap.
|
|
|
// We should stop caching new commands if num of un-processed
|
|
|
// command is bigger than a limit, e.g 50. In case datanode goes
|
|
|
// offline for sometime, the cached commands be flooded.
|
|
|
nodeManager.addDatanodeCommand(datanodeID,
|
|
|
new DeleteBlocksCommand(dnTXs));
|
|
|
- LOG.info("Added delete block command for datanode {} in the queue,"
|
|
|
+ LOG.debug(
|
|
|
+ "Added delete block command for datanode {} in the queue,"
|
|
|
+ " number of delete block transactions: {}, TxID list: {}",
|
|
|
datanodeID, dnTXs.size(),
|
|
|
String.join(",", transactions.getTransactionIDList(datanodeID)));
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- });
|
|
|
+ if (dnTxCount > 0) {
|
|
|
+ LOG.info("Totally added {} delete blocks command for"
|
|
|
+ + " {} datanodes, task elapsed time: {}ms",
|
|
|
+ dnTxCount, transactions.getDatanodes().size(),
|
|
|
+ Time.monotonicNow() - startTime);
|
|
|
}
|
|
|
+
|
|
|
return EmptyTaskResult.newResult();
|
|
|
}
|
|
|
|