|
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
|
|
|
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
|
|
|
.DeleteBlockTransactionResult;
|
|
|
import org.apache.hadoop.hdds.scm.container.Mapping;
|
|
|
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
|
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
|
@@ -45,13 +46,14 @@ import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.UUID;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+import static java.lang.Math.min;
|
|
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
|
|
.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
|
|
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
|
@@ -239,21 +241,26 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|
|
// set of dns which have successfully committed transaction txId.
|
|
|
dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
|
|
|
Long containerId = transactionResult.getContainerID();
|
|
|
- if (dnsWithCommittedTxn == null || containerId == null) {
|
|
|
- LOG.warn("Transaction txId={} commit by dnId={} failed."
|
|
|
- + " Corresponding entry not found.", txID, dnID);
|
|
|
+ if (dnsWithCommittedTxn == null) {
|
|
|
+ LOG.warn("Transaction txId={} commit by dnId={} for containerID={} "
|
|
|
+ + "failed. Corresponding entry not found.", txID, dnID,
|
|
|
+ containerId);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
dnsWithCommittedTxn.add(dnID);
|
|
|
- Collection<DatanodeDetails> containerDnsDetails =
|
|
|
+ Pipeline pipeline =
|
|
|
containerManager.getContainerWithPipeline(containerId)
|
|
|
- .getPipeline().getDatanodes().values();
|
|
|
+ .getPipeline();
|
|
|
+ Collection<DatanodeDetails> containerDnsDetails =
|
|
|
+ pipeline.getDatanodes().values();
|
|
|
// The delete entry can be safely removed from the log if all the
|
|
|
- // corresponding nodes commit the txn.
|
|
|
- if (dnsWithCommittedTxn.size() >= containerDnsDetails.size()) {
|
|
|
+ // corresponding nodes commit the txn. It is required to check that
|
|
|
+ // the nodes returned in the pipeline match the replication factor.
|
|
|
+ if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size())
|
|
|
+ >= pipeline.getFactor().getNumber()) {
|
|
|
List<UUID> containerDns = containerDnsDetails.stream()
|
|
|
- .map(dnDetails -> dnDetails.getUuid())
|
|
|
+ .map(DatanodeDetails::getUuid)
|
|
|
.collect(Collectors.toList());
|
|
|
if (dnsWithCommittedTxn.containsAll(containerDns)) {
|
|
|
transactionToDNsCommitMap.remove(txID);
|
|
@@ -338,15 +345,13 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|
|
* {@inheritDoc}
|
|
|
*
|
|
|
* @param containerBlocksMap a map of containerBlocks.
|
|
|
- * @return Mapping from containerId to latest transactionId for the container.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
@Override
|
|
|
- public Map<Long, Long> addTransactions(
|
|
|
+ public void addTransactions(
|
|
|
Map<Long, List<Long>> containerBlocksMap)
|
|
|
throws IOException {
|
|
|
BatchOperation batch = new BatchOperation();
|
|
|
- Map<Long, Long> deleteTransactionsMap = new HashMap<>();
|
|
|
lock.lock();
|
|
|
try {
|
|
|
long currentLatestID = lastTxID;
|
|
@@ -356,13 +361,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|
|
byte[] key = Longs.toByteArray(currentLatestID);
|
|
|
DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
|
|
|
entry.getKey(), entry.getValue());
|
|
|
- deleteTransactionsMap.put(entry.getKey(), currentLatestID);
|
|
|
batch.put(key, tx.toByteArray());
|
|
|
}
|
|
|
lastTxID = currentLatestID;
|
|
|
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
|
|
|
deletedStore.writeBatch(batch);
|
|
|
- return deleteTransactionsMap;
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
@@ -376,10 +379,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void getTransactions(DatanodeDeletedBlockTransactions transactions)
|
|
|
- throws IOException {
|
|
|
+ public Map<Long, Long> getTransactions(
|
|
|
+ DatanodeDeletedBlockTransactions transactions) throws IOException {
|
|
|
lock.lock();
|
|
|
try {
|
|
|
+ Map<Long, Long> deleteTransactionMap = new HashMap<>();
|
|
|
deletedStore.iterate(null, (key, value) -> {
|
|
|
if (!Arrays.equals(LATEST_TXID, key)) {
|
|
|
DeletedBlocksTransaction block = DeletedBlocksTransaction
|
|
@@ -388,6 +392,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|
|
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
|
|
|
if (transactions.addTransaction(block,
|
|
|
transactionToDNsCommitMap.get(block.getTxID()))) {
|
|
|
+ deleteTransactionMap.put(block.getContainerID(), block.getTxID());
|
|
|
transactionToDNsCommitMap
|
|
|
.putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
|
|
|
}
|
|
@@ -396,6 +401,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|
|
}
|
|
|
return true;
|
|
|
});
|
|
|
+ return deleteTransactionMap;
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|