|
@@ -21,7 +21,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
|
|
-import static org.hamcrest.CoreMatchers.equalTo;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNull;
|
|
|
+import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
+import static org.junit.jupiter.api.Assertions.fail;
|
|
|
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
|
|
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.File;
|
|
@@ -83,9 +89,8 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.test.LambdaTestUtils;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
-import org.junit.Assert;
|
|
|
-import org.junit.Assume;
|
|
|
-import org.junit.Test;
|
|
|
+import org.junit.jupiter.api.Test;
|
|
|
+import org.junit.jupiter.api.Timeout;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
import org.mockito.stubbing.Answer;
|
|
@@ -166,14 +171,16 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testCreateAndDestroy() throws Exception {
|
|
|
ShortCircuitCache cache =
|
|
|
new ShortCircuitCache(10, 1, 10, 1, 1, 10000, 0);
|
|
|
cache.close();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=5000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 5)
|
|
|
public void testInvalidConfiguration() throws Exception {
|
|
|
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
|
|
"maxTotalSize must be greater than zero.",
|
|
@@ -189,7 +196,8 @@ public class TestShortCircuitCache {
|
|
|
() -> new ShortCircuitCache(10, 1, 10, -1, 1, 10000, 0));
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testAddAndRetrieve() throws Exception {
|
|
|
final ShortCircuitCache cache =
|
|
|
new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0);
|
|
@@ -200,16 +208,16 @@ public class TestShortCircuitCache {
|
|
|
Preconditions.checkNotNull(replicaInfo1.getReplica());
|
|
|
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
|
|
|
pair.compareWith(replicaInfo1.getReplica().getDataStream(),
|
|
|
- replicaInfo1.getReplica().getMetaStream());
|
|
|
+ replicaInfo1.getReplica().getMetaStream());
|
|
|
ShortCircuitReplicaInfo replicaInfo2 =
|
|
|
- cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
|
|
|
- new ShortCircuitReplicaCreator() {
|
|
|
- @Override
|
|
|
- public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
- Assert.fail("expected to use existing entry.");
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
+ cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
|
|
|
+ new ShortCircuitReplicaCreator() {
|
|
|
+ @Override
|
|
|
+ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
+ fail("expected to use existing entry.");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
Preconditions.checkNotNull(replicaInfo2.getReplica());
|
|
|
Preconditions.checkState(replicaInfo2.getInvalidTokenException() == null);
|
|
|
Preconditions.checkState(replicaInfo1 == replicaInfo2);
|
|
@@ -222,14 +230,14 @@ public class TestShortCircuitCache {
|
|
|
// around for a while (we have configured the expiry period to be really,
|
|
|
// really long here)
|
|
|
ShortCircuitReplicaInfo replicaInfo3 =
|
|
|
- cache.fetchOrCreate(
|
|
|
- new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
|
|
|
- @Override
|
|
|
- public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
- Assert.fail("expected to use existing entry.");
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
+ cache.fetchOrCreate(
|
|
|
+ new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
|
|
|
+ @Override
|
|
|
+ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
+ fail("expected to use existing entry.");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
Preconditions.checkNotNull(replicaInfo3.getReplica());
|
|
|
Preconditions.checkState(replicaInfo3.getInvalidTokenException() == null);
|
|
|
replicaInfo3.getReplica().unref();
|
|
@@ -238,7 +246,8 @@ public class TestShortCircuitCache {
|
|
|
cache.close();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=100000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 100)
|
|
|
public void testExpiry() throws Exception {
|
|
|
final ShortCircuitCache cache =
|
|
|
new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000000, 0);
|
|
@@ -272,7 +281,8 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testEviction() throws Exception {
|
|
|
final ShortCircuitCache cache =
|
|
|
new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0);
|
|
@@ -305,13 +315,13 @@ public class TestShortCircuitCache {
|
|
|
final Integer iVal = i;
|
|
|
replicaInfos[i] = cache.fetchOrCreate(
|
|
|
new ExtendedBlockId(i, "test_bp1"),
|
|
|
- new ShortCircuitReplicaCreator() {
|
|
|
- @Override
|
|
|
- public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
- Assert.fail("expected to use existing entry for " + iVal);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
+ new ShortCircuitReplicaCreator() {
|
|
|
+ @Override
|
|
|
+ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
+ fail("expected to use existing entry for " + iVal);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
Preconditions.checkNotNull(replicaInfos[i].getReplica());
|
|
|
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
|
|
|
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
|
|
@@ -329,7 +339,7 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
});
|
|
|
Preconditions.checkState(replicaInfos[0].getReplica() == null);
|
|
|
- Assert.assertTrue(calledCreate.isTrue());
|
|
|
+ assertTrue(calledCreate.isTrue());
|
|
|
// Clean up
|
|
|
for (int i = 1; i < pairs.length; i++) {
|
|
|
replicaInfos[i].getReplica().unref();
|
|
@@ -340,7 +350,8 @@ public class TestShortCircuitCache {
|
|
|
cache.close();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testTimeBasedStaleness() throws Exception {
|
|
|
// Set up the cache with a short staleness time.
|
|
|
final ShortCircuitCache cache =
|
|
@@ -400,13 +411,13 @@ public class TestShortCircuitCache {
|
|
|
// Make sure that second replica did not go stale.
|
|
|
ShortCircuitReplicaInfo info = cache.fetchOrCreate(
|
|
|
new ExtendedBlockId(1, "test_bp1"), new ShortCircuitReplicaCreator() {
|
|
|
- @Override
|
|
|
- public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
- Assert.fail("second replica went stale, despite 1 " +
|
|
|
- "hour staleness time.");
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
+ @Override
|
|
|
+ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|
|
+ fail("second replica went stale, despite 1 " +
|
|
|
+ "hour staleness time.");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
info.getReplica().unref();
|
|
|
|
|
|
// Clean up
|
|
@@ -429,7 +440,7 @@ public class TestShortCircuitCache {
|
|
|
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
|
|
|
DFSInputStream.tcpReadsDisabledForTesting = true;
|
|
|
DomainSocket.disableBindPathValidation();
|
|
|
- Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
|
|
+ assumeTrue(DomainSocket.getLoadingFailureReason() == null);
|
|
|
return conf;
|
|
|
}
|
|
|
|
|
@@ -440,7 +451,8 @@ public class TestShortCircuitCache {
|
|
|
return new DomainPeer(sock);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testAllocShm() throws Exception {
|
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
@@ -456,7 +468,7 @@ public class TestShortCircuitCache {
|
|
|
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
|
|
throws IOException {
|
|
|
// The ClientShmManager starts off empty
|
|
|
- Assert.assertEquals(0, info.size());
|
|
|
+ assertEquals(0, info.size());
|
|
|
}
|
|
|
});
|
|
|
DomainPeer peer = getDomainPeerToDn(conf);
|
|
@@ -468,18 +480,18 @@ public class TestShortCircuitCache {
|
|
|
// Allocating the first shm slot requires using up a peer.
|
|
|
Slot slot = cache.allocShmSlot(datanode, peer, usedPeer,
|
|
|
blockId, "testAllocShm_client");
|
|
|
- Assert.assertNotNull(slot);
|
|
|
- Assert.assertTrue(usedPeer.booleanValue());
|
|
|
+ assertNotNull(slot);
|
|
|
+ assertTrue(usedPeer.booleanValue());
|
|
|
cache.getDfsClientShmManager().visit(new Visitor() {
|
|
|
@Override
|
|
|
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
|
|
throws IOException {
|
|
|
// The ClientShmManager starts off empty
|
|
|
- Assert.assertEquals(1, info.size());
|
|
|
+ assertEquals(1, info.size());
|
|
|
PerDatanodeVisitorInfo vinfo = info.get(datanode);
|
|
|
- Assert.assertFalse(vinfo.disabled);
|
|
|
- Assert.assertEquals(0, vinfo.full.size());
|
|
|
- Assert.assertEquals(1, vinfo.notFull.size());
|
|
|
+ assertFalse(vinfo.disabled);
|
|
|
+ assertEquals(0, vinfo.full.size());
|
|
|
+ assertEquals(1, vinfo.notFull.size());
|
|
|
}
|
|
|
});
|
|
|
cache.scheduleSlotReleaser(slot);
|
|
@@ -510,7 +522,8 @@ public class TestShortCircuitCache {
|
|
|
sockDir.close();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testShmBasedStaleness() throws Exception {
|
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
@@ -530,7 +543,7 @@ public class TestShortCircuitCache {
|
|
|
int first = fis.read();
|
|
|
final ExtendedBlock block =
|
|
|
DFSTestUtil.getFirstBlock(fs, new Path(TEST_FILE));
|
|
|
- Assert.assertTrue(first != -1);
|
|
|
+ assertTrue(first != -1);
|
|
|
cache.accept(new CacheVisitor() {
|
|
|
@Override
|
|
|
public void visit(int numOutstandingMmaps,
|
|
@@ -540,8 +553,8 @@ public class TestShortCircuitCache {
|
|
|
LinkedMap evictableMmapped) {
|
|
|
ShortCircuitReplica replica = replicas.get(
|
|
|
ExtendedBlockId.fromExtendedBlock(block));
|
|
|
- Assert.assertNotNull(replica);
|
|
|
- Assert.assertTrue(replica.getSlot().isValid());
|
|
|
+ assertNotNull(replica);
|
|
|
+ assertTrue(replica.getSlot().isValid());
|
|
|
}
|
|
|
});
|
|
|
// Stop the Namenode. This will close the socket keeping the client's
|
|
@@ -556,8 +569,8 @@ public class TestShortCircuitCache {
|
|
|
LinkedMap evictableMmapped) {
|
|
|
ShortCircuitReplica replica = replicas.get(
|
|
|
ExtendedBlockId.fromExtendedBlock(block));
|
|
|
- Assert.assertNotNull(replica);
|
|
|
- Assert.assertFalse(replica.getSlot().isValid());
|
|
|
+ assertNotNull(replica);
|
|
|
+ assertFalse(replica.getSlot().isValid());
|
|
|
}
|
|
|
});
|
|
|
cluster.shutdown();
|
|
@@ -569,7 +582,8 @@ public class TestShortCircuitCache {
|
|
|
* The DataNode will notify the DFSClient that the replica is stale via the
|
|
|
* ShortCircuitShm.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
|
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
@@ -590,7 +604,7 @@ public class TestShortCircuitCache {
|
|
|
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
|
|
throws IOException {
|
|
|
// The ClientShmManager starts off empty.
|
|
|
- Assert.assertEquals(0, info.size());
|
|
|
+ assertEquals(0, info.size());
|
|
|
}
|
|
|
});
|
|
|
final Path TEST_PATH = new Path("/test_file");
|
|
@@ -601,7 +615,7 @@ public class TestShortCircuitCache {
|
|
|
byte contents[] = DFSTestUtil.readFileBuffer(fs, TEST_PATH);
|
|
|
byte expected[] = DFSTestUtil.
|
|
|
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
|
|
- Assert.assertTrue(Arrays.equals(contents, expected));
|
|
|
+ assertTrue(Arrays.equals(contents, expected));
|
|
|
// Loading this file brought the ShortCircuitReplica into our local
|
|
|
// replica cache.
|
|
|
final DatanodeInfo datanode = new DatanodeInfoBuilder()
|
|
@@ -611,12 +625,12 @@ public class TestShortCircuitCache {
|
|
|
@Override
|
|
|
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
|
|
throws IOException {
|
|
|
- Assert.assertTrue(info.get(datanode).full.isEmpty());
|
|
|
- Assert.assertFalse(info.get(datanode).disabled);
|
|
|
- Assert.assertEquals(1, info.get(datanode).notFull.values().size());
|
|
|
+ assertTrue(info.get(datanode).full.isEmpty());
|
|
|
+ assertFalse(info.get(datanode).disabled);
|
|
|
+ assertEquals(1, info.get(datanode).notFull.values().size());
|
|
|
DfsClientShm shm =
|
|
|
info.get(datanode).notFull.values().iterator().next();
|
|
|
- Assert.assertFalse(shm.isDisconnected());
|
|
|
+ assertFalse(shm.isDisconnected());
|
|
|
}
|
|
|
});
|
|
|
// Remove the file whose blocks we just read.
|
|
@@ -633,9 +647,9 @@ public class TestShortCircuitCache {
|
|
|
@Override
|
|
|
public void visit(HashMap<DatanodeInfo,
|
|
|
PerDatanodeVisitorInfo> info) throws IOException {
|
|
|
- Assert.assertTrue(info.get(datanode).full.isEmpty());
|
|
|
- Assert.assertFalse(info.get(datanode).disabled);
|
|
|
- Assert.assertEquals(1,
|
|
|
+ assertTrue(info.get(datanode).full.isEmpty());
|
|
|
+ assertFalse(info.get(datanode).disabled);
|
|
|
+ assertEquals(1,
|
|
|
info.get(datanode).notFull.values().size());
|
|
|
DfsClientShm shm = info.get(datanode).notFull.values().
|
|
|
iterator().next();
|
|
@@ -687,7 +701,8 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
|
|
|
// Regression test for HDFS-7915
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
|
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
@@ -729,7 +744,8 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
|
|
|
// Regression test for HADOOP-11802
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testDataXceiverHandlesRequestShortCircuitShmFailure()
|
|
|
throws Exception {
|
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
@@ -762,7 +778,7 @@ public class TestShortCircuitCache {
|
|
|
// The shared memory segment allocation will fail because of the failure
|
|
|
// injector.
|
|
|
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
|
|
|
- Assert.fail("expected readFileBuffer to fail, but it succeeded.");
|
|
|
+ fail("expected readFileBuffer to fail, but it succeeded.");
|
|
|
} catch (Throwable t) {
|
|
|
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
|
|
|
"testing, but we failed to do a non-TCP read.", t);
|
|
@@ -796,7 +812,8 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
|
|
|
// Regression test for HDFS-8070
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
|
|
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
@@ -915,7 +932,7 @@ public class TestShortCircuitCache {
|
|
|
DatanodeInfo[] nodes = blk.getLocations();
|
|
|
|
|
|
try {
|
|
|
- Assert.assertNull(new BlockReaderFactory(new DfsClientConf(conf))
|
|
|
+ assertNull(new BlockReaderFactory(new DfsClientConf(conf))
|
|
|
.setInetSocketAddress(NetUtils.createSocketAddr(nodes[0]
|
|
|
.getXferAddr()))
|
|
|
.setClientCacheContext(clientContext)
|
|
@@ -924,13 +941,14 @@ public class TestShortCircuitCache {
|
|
|
.setBlockToken(new Token())
|
|
|
.createShortCircuitReplicaInfo());
|
|
|
} catch (NullPointerException ex) {
|
|
|
- Assert.fail("Should not throw NPE when the native library is unable " +
|
|
|
+ fail("Should not throw NPE when the native library is unable " +
|
|
|
"to create new files!");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout = 60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testDomainSocketClosedByDN() throws Exception {
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
|
Configuration conf =
|
|
@@ -966,16 +984,17 @@ public class TestShortCircuitCache {
|
|
|
Thread.sleep(2000);
|
|
|
cache.scheduleSlotReleaser(slot2);
|
|
|
Thread.sleep(2000);
|
|
|
- Assert.assertEquals(0,
|
|
|
+ assertEquals(0,
|
|
|
cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
|
|
|
- Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Regression test for HDFS-16535
|
|
|
- @Test(timeout = 60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testDomainSocketClosedByMultipleDNs() throws Exception {
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
|
String testName = "testDomainSocketClosedByMultipleDNs";
|
|
@@ -1025,33 +1044,34 @@ public class TestShortCircuitCache {
|
|
|
dn1.getShortCircuitRegistry()
|
|
|
.registerSlot(blockId1, slot3.getSlotId(), false);
|
|
|
|
|
|
- Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
|
|
|
- Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
|
|
|
- Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
|
|
|
+ assertEquals(2, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
|
|
|
+ assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
|
|
|
|
|
|
// Release the slot of DataNode-1 first.
|
|
|
cache.scheduleSlotReleaser(slot3);
|
|
|
Thread.sleep(2000);
|
|
|
- Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ assertEquals(1, cache.getDfsClientShmManager().getShmNum());
|
|
|
|
|
|
// Release the slots of DataNode-0.
|
|
|
cache.scheduleSlotReleaser(slot1);
|
|
|
Thread.sleep(2000);
|
|
|
- Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
|
|
|
- " due to slot release failures.",
|
|
|
- 1, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ assertEquals(1, cache.getDfsClientShmManager().getShmNum(),
|
|
|
+ "0 ShmNum means the shm of DataNode-0 is shutdown"
|
|
|
+ + " due to slot release failures.");
|
|
|
cache.scheduleSlotReleaser(slot2);
|
|
|
Thread.sleep(2000);
|
|
|
|
|
|
- Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
|
|
|
- Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
|
|
|
- Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
|
|
|
+ assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
|
|
|
+ assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test(timeout = 60000)
|
|
|
+ @Test
|
|
|
+ @Timeout(value = 60)
|
|
|
public void testDNRestart() throws Exception {
|
|
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
|
Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
|
|
@@ -1089,9 +1109,9 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
cache.scheduleSlotReleaser(slot2);
|
|
|
Thread.sleep(2000);
|
|
|
- Assert.assertEquals(0,
|
|
|
+ assertEquals(0,
|
|
|
cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
|
|
|
- Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|