|
@@ -331,86 +331,93 @@ public class TestDatanodeRegistration {
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4);
|
|
|
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE);
|
|
|
|
|
|
- final MiniDFSCluster cluster =
|
|
|
- new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
- cluster.waitActive();
|
|
|
- cluster.getHttpUri(0);
|
|
|
- FSNamesystem fsn = cluster.getNamesystem();
|
|
|
- String bpId = fsn.getBlockPoolId();
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ cluster.getHttpUri(0);
|
|
|
+ FSNamesystem fsn = cluster.getNamesystem();
|
|
|
+ String bpId = fsn.getBlockPoolId();
|
|
|
|
|
|
- DataNode dn = cluster.getDataNodes().get(0);
|
|
|
- DatanodeDescriptor dnd =
|
|
|
- NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
|
|
|
- DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
|
|
- DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
|
|
|
+ DataNode dn = cluster.getDataNodes().get(0);
|
|
|
+ DatanodeDescriptor dnd =
|
|
|
+ NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
|
|
|
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
|
|
+ DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
|
|
|
|
|
|
- // registration should not change after heartbeat.
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
|
|
|
- waitForHeartbeat(dn, dnd);
|
|
|
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ // registration should not change after heartbeat.
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
|
|
|
+ waitForHeartbeat(dn, dnd);
|
|
|
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
|
|
|
- // force a re-registration on next heartbeat.
|
|
|
- dnd.setForceRegistration(true);
|
|
|
- assertFalse(dnd.isRegistered());
|
|
|
- waitForHeartbeat(dn, dnd);
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
|
|
|
- assertNotSame(lastReg, newReg);
|
|
|
- lastReg = newReg;
|
|
|
+ // force a re-registration on next heartbeat.
|
|
|
+ dnd.setForceRegistration(true);
|
|
|
+ assertFalse(dnd.isRegistered());
|
|
|
+ waitForHeartbeat(dn, dnd);
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
|
|
|
+ assertNotSame(lastReg, newReg);
|
|
|
+ lastReg = newReg;
|
|
|
|
|
|
- // registration should not change on subsequent heartbeats.
|
|
|
- waitForHeartbeat(dn, dnd);
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
- assertTrue(waitForBlockReport(dn, dnd));
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ // registration should not change on subsequent heartbeats.
|
|
|
+ waitForHeartbeat(dn, dnd);
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ assertTrue(waitForBlockReport(dn, dnd));
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
|
|
|
- // check that block report is not processed and registration didn't change.
|
|
|
- dnd.setForceRegistration(true);
|
|
|
- assertFalse(waitForBlockReport(dn, dnd));
|
|
|
- assertFalse(dnd.isRegistered());
|
|
|
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ // check that block report is not processed and registration didn't
|
|
|
+ // change.
|
|
|
+ dnd.setForceRegistration(true);
|
|
|
+ assertFalse(waitForBlockReport(dn, dnd));
|
|
|
+ assertFalse(dnd.isRegistered());
|
|
|
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
|
|
|
- // heartbeat should trigger re-registration, and next block report should
|
|
|
- // not change registration.
|
|
|
- waitForHeartbeat(dn, dnd);
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- newReg = dn.getDNRegistrationForBP(bpId);
|
|
|
- assertNotSame(lastReg, newReg);
|
|
|
- lastReg = newReg;
|
|
|
- assertTrue(waitForBlockReport(dn, dnd));
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ // heartbeat should trigger re-registration, and next block report
|
|
|
+ // should not change registration.
|
|
|
+ waitForHeartbeat(dn, dnd);
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ newReg = dn.getDNRegistrationForBP(bpId);
|
|
|
+ assertNotSame(lastReg, newReg);
|
|
|
+ lastReg = newReg;
|
|
|
+ assertTrue(waitForBlockReport(dn, dnd));
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
|
|
|
- // registration doesn't change.
|
|
|
- ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
|
|
|
- dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
|
|
|
- DataNodeTestUtils.triggerDeletionReport(dn);
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ // registration doesn't change.
|
|
|
+ ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
|
|
|
+ dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
|
|
|
+ DataNodeTestUtils.triggerDeletionReport(dn);
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
|
|
|
- // a failed IBR will effectively unregister the node.
|
|
|
- boolean failed = false;
|
|
|
- try {
|
|
|
- // pass null to cause a failure since there aren't any easy failure
|
|
|
- // modes since it shouldn't happen.
|
|
|
- fsn.processIncrementalBlockReport(lastReg, null);
|
|
|
- } catch (NullPointerException npe) {
|
|
|
- failed = true;
|
|
|
- }
|
|
|
- assertTrue("didn't fail", failed);
|
|
|
- assertFalse(dnd.isRegistered());
|
|
|
+ // a failed IBR will effectively unregister the node.
|
|
|
+ boolean failed = false;
|
|
|
+ try {
|
|
|
+ // pass null to cause a failure since there aren't any easy failure
|
|
|
+ // modes since it shouldn't happen.
|
|
|
+ fsn.processIncrementalBlockReport(lastReg, null);
|
|
|
+ } catch (NullPointerException npe) {
|
|
|
+ failed = true;
|
|
|
+ }
|
|
|
+ assertTrue("didn't fail", failed);
|
|
|
+ assertFalse(dnd.isRegistered());
|
|
|
|
|
|
- // should remain unregistered until next heartbeat.
|
|
|
- dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
|
|
|
- DataNodeTestUtils.triggerDeletionReport(dn);
|
|
|
- assertFalse(dnd.isRegistered());
|
|
|
- assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
- waitForHeartbeat(dn, dnd);
|
|
|
- assertTrue(dnd.isRegistered());
|
|
|
- assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ // should remain unregistered until next heartbeat.
|
|
|
+ dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
|
|
|
+ DataNodeTestUtils.triggerDeletionReport(dn);
|
|
|
+ assertFalse(dnd.isRegistered());
|
|
|
+ assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ waitForHeartbeat(dn, dnd);
|
|
|
+ assertTrue(dnd.isRegistered());
|
|
|
+ assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd)
|