|
@@ -33,49 +33,42 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
|
|
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
|
|
+import org.apache.hadoop.ipc.FairCallQueue;
|
|
|
+import org.apache.hadoop.metrics2.MetricsException;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.junit.After;
|
|
|
-import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
public class TestRefreshCallQueue {
|
|
|
private MiniDFSCluster cluster;
|
|
|
private Configuration config;
|
|
|
- private FileSystem fs;
|
|
|
static int mockQueueConstructions;
|
|
|
static int mockQueuePuts;
|
|
|
- private String callQueueConfigKey = "";
|
|
|
- private final Random rand = new Random();
|
|
|
+ private int nnPort = 0;
|
|
|
|
|
|
- @Before
|
|
|
- public void setUp() throws Exception {
|
|
|
- // We want to count additional events, so we reset here
|
|
|
- mockQueueConstructions = 0;
|
|
|
- mockQueuePuts = 0;
|
|
|
+ private void setUp(Class<?> queueClass) throws IOException {
|
|
|
int portRetries = 5;
|
|
|
- int nnPort;
|
|
|
-
|
|
|
+ Random rand = new Random();
|
|
|
for (; portRetries > 0; --portRetries) {
|
|
|
// Pick a random port in the range [30000,60000).
|
|
|
- nnPort = 30000 + rand.nextInt(30000);
|
|
|
+ nnPort = 30000 + rand.nextInt(30000);
|
|
|
config = new Configuration();
|
|
|
- callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl";
|
|
|
- config.setClass(callQueueConfigKey,
|
|
|
- MockCallQueue.class, BlockingQueue.class);
|
|
|
+ String callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl";
|
|
|
+ config.setClass(callQueueConfigKey, queueClass, BlockingQueue.class);
|
|
|
config.set("hadoop.security.authorization", "true");
|
|
|
|
|
|
FileSystem.setDefaultUri(config, "hdfs://localhost:" + nnPort);
|
|
|
- fs = FileSystem.get(config);
|
|
|
-
|
|
|
try {
|
|
|
- cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort).build();
|
|
|
+ cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort)
|
|
|
+ .build();
|
|
|
cluster.waitActive();
|
|
|
break;
|
|
|
} catch (BindException be) {
|
|
|
// Retry with a different port number.
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
if (portRetries == 0) {
|
|
|
// Bail if we get very unlucky with our choice of ports.
|
|
|
fail("Failed to pick an ephemeral port for the NameNode RPC server.");
|
|
@@ -83,8 +76,8 @@ public class TestRefreshCallQueue {
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
- public void tearDown() throws Exception {
|
|
|
- if(cluster!=null) {
|
|
|
+ public void tearDown() throws IOException {
|
|
|
+ if (cluster != null) {
|
|
|
cluster.shutdown();
|
|
|
cluster = null;
|
|
|
}
|
|
@@ -105,29 +98,66 @@ public class TestRefreshCallQueue {
|
|
|
|
|
|
// Returns true if mock queue was used for put
|
|
|
public boolean canPutInMockQueue() throws IOException {
|
|
|
- int putsBefore = mockQueuePuts;
|
|
|
- fs.exists(new Path("/")); // Make an RPC call
|
|
|
- return mockQueuePuts > putsBefore;
|
|
|
+ FileSystem fs = FileSystem.get(config);
|
|
|
+ int putsBefore = mockQueuePuts;
|
|
|
+ fs.exists(new Path("/")); // Make an RPC call
|
|
|
+ fs.close();
|
|
|
+ return mockQueuePuts > putsBefore;
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testRefresh() throws Exception {
|
|
|
- assertTrue("Mock queue should have been constructed", mockQueueConstructions > 0);
|
|
|
+ // We want to count additional events, so we reset here
|
|
|
+ mockQueueConstructions = 0;
|
|
|
+ mockQueuePuts = 0;
|
|
|
+ setUp(MockCallQueue.class);
|
|
|
+
|
|
|
+ assertTrue("Mock queue should have been constructed",
|
|
|
+ mockQueueConstructions > 0);
|
|
|
assertTrue("Puts are routed through MockQueue", canPutInMockQueue());
|
|
|
int lastMockQueueConstructions = mockQueueConstructions;
|
|
|
|
|
|
- // Replace queue with the queue specified in core-site.xml, which would be the LinkedBlockingQueue
|
|
|
+ // Replace queue with the queue specified in core-site.xml, which would be
|
|
|
+ // the LinkedBlockingQueue
|
|
|
DFSAdmin admin = new DFSAdmin(config);
|
|
|
String [] args = new String[]{"-refreshCallQueue"};
|
|
|
int exitCode = admin.run(args);
|
|
|
assertEquals("DFSAdmin should return 0", 0, exitCode);
|
|
|
|
|
|
- assertEquals("Mock queue should have no additional constructions", lastMockQueueConstructions, mockQueueConstructions);
|
|
|
+ assertEquals("Mock queue should have no additional constructions",
|
|
|
+ lastMockQueueConstructions, mockQueueConstructions);
|
|
|
try {
|
|
|
- assertFalse("Puts are routed through LBQ instead of MockQueue", canPutInMockQueue());
|
|
|
- } catch (IOException ioe){
|
|
|
+ assertFalse("Puts are routed through LBQ instead of MockQueue",
|
|
|
+ canPutInMockQueue());
|
|
|
+ } catch (IOException ioe) {
|
|
|
fail("Could not put into queue at all");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testRefreshCallQueueWithFairCallQueue() throws Exception {
|
|
|
+ setUp(FairCallQueue.class);
|
|
|
+ boolean oldValue = DefaultMetricsSystem.inMiniClusterMode();
|
|
|
+
|
|
|
+ // throw an error when we double-initialize JvmMetrics
|
|
|
+ DefaultMetricsSystem.setMiniClusterMode(false);
|
|
|
+
|
|
|
+ NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
|
|
|
+ try {
|
|
|
+ rpcServer.getClientRpcServer().refreshCallQueue(config);
|
|
|
+ } catch (Exception e) {
|
|
|
+ Throwable cause = e.getCause();
|
|
|
+ if ((cause instanceof MetricsException)
|
|
|
+ && cause.getMessage().contains(
|
|
|
+ "Metrics source DecayRpcSchedulerMetrics2.ipc." + nnPort
|
|
|
+ + " already exists!")) {
|
|
|
+ fail("DecayRpcScheduler metrics should be unregistered before"
|
|
|
+ + " reregister");
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ DefaultMetricsSystem.setMiniClusterMode(oldValue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|