|
@@ -71,6 +71,8 @@ import org.apache.hadoop.test.Whitebox;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
+import org.mockito.ArgumentCaptor;
|
|
|
+import org.mockito.ArgumentMatchers;
|
|
|
|
|
|
import static org.hamcrest.core.Is.is;
|
|
|
import static org.junit.Assert.*;
|
|
@@ -1126,4 +1128,70 @@ public class TestDatanodeManager {
|
|
|
super();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testComputeReconstructedTaskNum() throws IOException {
|
|
|
+ verifyComputeReconstructedTaskNum(100, 100, 150, 250, 100);
|
|
|
+ verifyComputeReconstructedTaskNum(200, 100000, 200000, 300000, 400000);
|
|
|
+ verifyComputeReconstructedTaskNum(1000000, 100, 150, 250, 100);
|
|
|
+ verifyComputeReconstructedTaskNum(14000000, 200, 200, 400, 200);
|
|
|
+
|
|
|
+ }
|
|
|
+ public void verifyComputeReconstructedTaskNum(int xmitsInProgress, int numReplicationBlocks,
|
|
|
+ int maxTransfers, int numECTasksToBeReplicated, int numBlocksToBeErasureCoded)
|
|
|
+ throws IOException {
|
|
|
+ FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
|
|
+ Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, maxTransfers);
|
|
|
+ DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));
|
|
|
+
|
|
|
+ DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
|
|
|
+ Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
|
|
|
+ Mockito.when(nodeInfo.getStorageInfos()).thenReturn(new DatanodeStorageInfo[0]);
|
|
|
+
|
|
|
+ Mockito.when(nodeInfo.getNumberOfReplicateBlocks()).thenReturn(numReplicationBlocks);
|
|
|
+ Mockito.when(nodeInfo.getNumberOfECBlocksToBeReplicated()).thenReturn(numECTasksToBeReplicated);
|
|
|
+ Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded())
|
|
|
+ .thenReturn(numBlocksToBeErasureCoded);
|
|
|
+
|
|
|
+ // Create an ArgumentCaptor to capture the counts for numReplicationTasks,
|
|
|
+ // numEcReplicatedTasks,numECReconstructedTasks.
|
|
|
+ ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
|
|
|
+ Mockito.when(nodeInfo.getErasureCodeCommand(ArgumentMatchers.anyInt()))
|
|
|
+ .thenReturn(Collections.nCopies(0, null));
|
|
|
+ Mockito.when(nodeInfo.getReplicationCommand(ArgumentMatchers.anyInt()))
|
|
|
+ .thenReturn(Collections.nCopies(0, null));
|
|
|
+ Mockito.when(nodeInfo.getECReplicatedCommand(ArgumentMatchers.anyInt()))
|
|
|
+ .thenReturn(Collections.nCopies(0, null));
|
|
|
+
|
|
|
+ DatanodeRegistration nodeReg = Mockito.mock(DatanodeRegistration.class);
|
|
|
+ Mockito.when(dm.getDatanode(nodeReg)).thenReturn(nodeInfo);
|
|
|
+
|
|
|
+
|
|
|
+ dm.handleHeartbeat(nodeReg, new StorageReport[1], "bp-123", 0, 0,
|
|
|
+ 10, xmitsInProgress, 0, null, SlowPeerReports.EMPTY_REPORT,
|
|
|
+ SlowDiskReports.EMPTY_REPORT);
|
|
|
+
|
|
|
+ Mockito.verify(nodeInfo).getReplicationCommand(captor.capture());
|
|
|
+ int numReplicationTasks = captor.getValue();
|
|
|
+
|
|
|
+ Mockito.verify(nodeInfo).getECReplicatedCommand(captor.capture());
|
|
|
+ int numEcReplicatedTasks = captor.getValue();
|
|
|
+
|
|
|
+ Mockito.verify(nodeInfo).getErasureCodeCommand(captor.capture());
|
|
|
+ int numECReconstructedTasks = captor.getValue();
|
|
|
+
|
|
|
+ // Verify that when DN xmitsInProgress exceeds maxTransfers,
|
|
|
+ // the number of tasks should be <= 0.
|
|
|
+ if (xmitsInProgress >= maxTransfers) {
|
|
|
+ assertTrue(numReplicationTasks <= 0);
|
|
|
+ assertTrue(numEcReplicatedTasks <= 0);
|
|
|
+ assertTrue(numECReconstructedTasks <= 0);
|
|
|
+ } else {
|
|
|
+ assertTrue(numReplicationTasks >= 0);
|
|
|
+ assertTrue(numEcReplicatedTasks >= 0);
|
|
|
+ assertTrue(numECReconstructedTasks >= 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|