|
@@ -20,9 +20,15 @@ package org.apache.ambari.server.utils;
|
|
|
import static org.easymock.EasyMock.expect;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.powermock.api.easymock.PowerMock.mockStaticPartial;
|
|
|
+import static org.powermock.api.easymock.PowerMock.replayAll;
|
|
|
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.InputStreamReader;
|
|
|
import java.net.UnknownHostException;
|
|
|
+import java.nio.charset.Charset;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
@@ -42,8 +48,6 @@ import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
|
|
|
import org.apache.ambari.server.actionmanager.Stage;
|
|
|
import org.apache.ambari.server.agent.ExecutionCommand;
|
|
|
import org.apache.ambari.server.api.services.AmbariMetaInfo;
|
|
|
-import org.apache.ambari.server.configuration.Configuration;
|
|
|
-import org.apache.ambari.server.controller.HostsMap;
|
|
|
import org.apache.ambari.server.orm.GuiceJpaInitializer;
|
|
|
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
|
|
|
import org.apache.ambari.server.state.Cluster;
|
|
@@ -57,19 +61,17 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.codehaus.jackson.JsonGenerationException;
|
|
|
import org.codehaus.jackson.map.JsonMappingException;
|
|
|
import org.junit.Before;
|
|
|
-import org.junit.Test;
|
|
|
import org.junit.Ignore;
|
|
|
+import org.junit.Test;
|
|
|
import org.junit.runner.RunWith;
|
|
|
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
|
|
import org.powermock.core.classloader.annotations.PrepareForTest;
|
|
|
import org.powermock.modules.junit4.PowerMockRunner;
|
|
|
-import static org.powermock.api.easymock.PowerMock.replayAll;
|
|
|
-import java.net.InetAddress;
|
|
|
-import static org.powermock.api.easymock.PowerMock.*;
|
|
|
|
|
|
import com.google.common.collect.ContiguousSet;
|
|
|
import com.google.common.collect.DiscreteDomain;
|
|
|
import com.google.common.collect.Range;
|
|
|
+import com.google.gson.Gson;
|
|
|
import com.google.inject.Guice;
|
|
|
import com.google.inject.Injector;
|
|
|
|
|
@@ -105,12 +107,12 @@ public class TestStageUtils {
|
|
|
Injector injector) throws AmbariException {
|
|
|
cl.setDesiredStackVersion(new StackId(STACK_ID));
|
|
|
cl.addService(serviceName);
|
|
|
-
|
|
|
+
|
|
|
for (Entry<String, List<Integer>> component : topology.entrySet()) {
|
|
|
-
|
|
|
+
|
|
|
String componentName = component.getKey();
|
|
|
cl.getService(serviceName).addServiceComponent(componentName);
|
|
|
-
|
|
|
+
|
|
|
for (Integer hostIndex : component.getValue()) {
|
|
|
cl.getService(serviceName)
|
|
|
.getServiceComponent(componentName)
|
|
@@ -151,14 +153,20 @@ public class TestStageUtils {
|
|
|
public void testJasonToExecutionCommand() throws JsonGenerationException,
|
|
|
JsonMappingException, JAXBException, IOException {
|
|
|
Stage s = StageUtils.getATestStage(1, 2, "host1", "clusterHostInfo");
|
|
|
- ExecutionCommand cmd = s.getExecutionCommands("host1").get(0).getExecutionCommand();
|
|
|
+ ExecutionCommand cmd = s.getExecutionCommands("host1").get(0).getExecutionCommand();
|
|
|
HashMap<String, Map<String,String>> configTags = new HashMap<String, Map<String,String>>();
|
|
|
Map<String, String> globalTag = new HashMap<String, String>();
|
|
|
globalTag.put("tag", "version1");
|
|
|
configTags.put("global", globalTag );
|
|
|
cmd.setConfigurationTags(configTags);
|
|
|
String json = StageUtils.jaxbToString(cmd);
|
|
|
- ExecutionCommand cmdDes = StageUtils.stringToExecutionCommand(json);
|
|
|
+
|
|
|
+ InputStream is = new ByteArrayInputStream(
|
|
|
+ json.getBytes(Charset.forName("UTF8")));
|
|
|
+
|
|
|
+ ExecutionCommand cmdDes = new Gson().fromJson(new InputStreamReader(is),
|
|
|
+ ExecutionCommand.class);
|
|
|
+
|
|
|
assertEquals(cmd.toString(), cmdDes.toString());
|
|
|
assertEquals(cmd, cmdDes);
|
|
|
}
|
|
@@ -195,20 +203,20 @@ public class TestStageUtils {
|
|
|
8672,
|
|
|
null,
|
|
|
8673);
|
|
|
-
|
|
|
+
|
|
|
fsm.addCluster("c1");
|
|
|
fsm.getCluster("c1").setDesiredStackVersion(new StackId(STACK_ID));
|
|
|
-
|
|
|
+
|
|
|
int index = 0;
|
|
|
-
|
|
|
+
|
|
|
for (String host: hostList) {
|
|
|
fsm.addHost(host);
|
|
|
-
|
|
|
+
|
|
|
Map<String, String> hostAttributes = new HashMap<String, String>();
|
|
|
hostAttributes.put("os_family", "redhat");
|
|
|
hostAttributes.put("os_release_version", "5.9");
|
|
|
fsm.getHost(host).setHostAttributes(hostAttributes);
|
|
|
-
|
|
|
+
|
|
|
fsm.getHost(host).setCurrentPingPort(pingPorts.get(index));
|
|
|
fsm.getHost(host).persist();
|
|
|
fsm.mapHostToCluster(host, "c1");
|
|
@@ -222,24 +230,24 @@ public class TestStageUtils {
|
|
|
List<Integer> datanodeIndexes = Arrays.asList(0,1,2,3,5,7,8,9);
|
|
|
hdfsTopology.put("DATANODE", new ArrayList<Integer>(datanodeIndexes));
|
|
|
addService(fsm.getCluster("c1"), hostList, hdfsTopology , "HDFS", injector);
|
|
|
-
|
|
|
+
|
|
|
//Add HBASE service
|
|
|
- Map<String, List<Integer>> hbaseTopology = new HashMap<String, List<Integer>>();
|
|
|
+ Map<String, List<Integer>> hbaseTopology = new HashMap<String, List<Integer>>();
|
|
|
hbaseTopology.put("HBASE_MASTER", Collections.singletonList(5));
|
|
|
List<Integer> regionServiceIndexes = Arrays.asList(1,3,5,8,9);
|
|
|
hbaseTopology.put("HBASE_REGIONSERVER", regionServiceIndexes);
|
|
|
addService(fsm.getCluster("c1"), hostList, hbaseTopology , "HBASE", injector);
|
|
|
-
|
|
|
+
|
|
|
//Add MAPREDUCE service
|
|
|
- Map<String, List<Integer>> mrTopology = new HashMap<String, List<Integer>>();
|
|
|
+ Map<String, List<Integer>> mrTopology = new HashMap<String, List<Integer>>();
|
|
|
mrTopology.put("JOBTRACKER", Collections.singletonList(5));
|
|
|
List<Integer> taskTrackerIndexes = Arrays.asList(1,2,3,4,5,7,9);
|
|
|
mrTopology.put("TASKTRACKER", taskTrackerIndexes);
|
|
|
addService(fsm.getCluster("c1"), hostList, mrTopology , "MAPREDUCE", injector);
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
//Add NONAME service
|
|
|
- Map<String, List<Integer>> nonameTopology = new HashMap<String, List<Integer>>();
|
|
|
+ Map<String, List<Integer>> nonameTopology = new HashMap<String, List<Integer>>();
|
|
|
nonameTopology.put("NONAME_SERVER", Collections.singletonList(7));
|
|
|
addService(fsm.getCluster("c1"), hostList, nonameTopology , "NONAME", injector);
|
|
|
|
|
@@ -259,39 +267,40 @@ public class TestStageUtils {
|
|
|
for (Host host: fsm.getHosts()) {
|
|
|
assertTrue(allHosts.contains(host.getHostName()));
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
//Check HDFS topology compression
|
|
|
Map<String, String> hdfsMapping = new HashMap<String, String>();
|
|
|
hdfsMapping.put("DATANODE", "slave_hosts");
|
|
|
hdfsMapping.put("NAMENODE", "namenode_host");
|
|
|
hdfsMapping.put("SECONDARY_NAMENODE", "snamenode_host");
|
|
|
checkServiceCompression(info, hdfsMapping, hdfsTopology, hostList);
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
//Check HBASE topology compression
|
|
|
Map<String, String> hbaseMapping = new HashMap<String, String>();
|
|
|
hbaseMapping.put("HBASE_MASTER", "hbase_master_hosts");
|
|
|
hbaseMapping.put("HBASE_REGIONSERVER", "hbase_rs_hosts");
|
|
|
checkServiceCompression(info, hbaseMapping, hbaseTopology, hostList);
|
|
|
-
|
|
|
+
|
|
|
//Check MAPREDUCE topology compression
|
|
|
Map<String, String> mrMapping = new HashMap<String, String>();
|
|
|
mrMapping.put("JOBTRACKER", "jtnode_host");
|
|
|
mrMapping.put("TASKTRACKER", "mapred_tt_hosts");
|
|
|
checkServiceCompression(info, mrMapping, mrTopology, hostList);
|
|
|
-
|
|
|
+
|
|
|
Set<String> actualPingPorts = info.get("all_ping_ports");
|
|
|
-
|
|
|
- if (pingPorts.contains(null))
|
|
|
+
|
|
|
+ if (pingPorts.contains(null)) {
|
|
|
assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size() + 1);
|
|
|
- else
|
|
|
+ } else {
|
|
|
assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size());
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
List<Integer> pingPortsActual = getRangeMappedDecompressedSet(actualPingPorts);
|
|
|
|
|
|
List<Integer> reindexedPorts = getReindexedList(pingPortsActual, new ArrayList<String>(allHosts), hostList);
|
|
|
-
|
|
|
+
|
|
|
//Treat null values
|
|
|
while (pingPorts.contains(null)) {
|
|
|
int indexOfNull = pingPorts.indexOf(null);
|
|
@@ -299,7 +308,7 @@ public class TestStageUtils {
|
|
|
}
|
|
|
|
|
|
assertEquals(pingPorts, reindexedPorts);
|
|
|
-
|
|
|
+
|
|
|
// check for no-name in the list
|
|
|
assertTrue(info.containsKey("noname_server_hosts"));
|
|
|
assertTrue(info.containsKey("decom_tt_hosts"));
|
|
@@ -316,36 +325,38 @@ public class TestStageUtils {
|
|
|
private void checkServiceCompression(Map<String, Set<String>> info,
|
|
|
Map<String, String> serviceMapping, Map<String, List<Integer>> serviceTopology,
|
|
|
List<String> hostList) {
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
for (Entry<String, List<Integer>> component: serviceTopology.entrySet()) {
|
|
|
-
|
|
|
+
|
|
|
String componentName = component.getKey();
|
|
|
-
|
|
|
+
|
|
|
List<Integer> componentIndexesExpected = component.getValue();
|
|
|
-
|
|
|
+
|
|
|
String roleName = serviceMapping.get(componentName);
|
|
|
-
|
|
|
+
|
|
|
assertTrue("No mapping for " + componentName , roleName != null);
|
|
|
-
|
|
|
+
|
|
|
Set<Integer> componentIndexesActual = getDecompressedSet(info.get(roleName));
|
|
|
-
|
|
|
+
|
|
|
Set<String> expectedComponentHosts = new HashSet<String>();
|
|
|
-
|
|
|
- for (Integer i: componentIndexesExpected)
|
|
|
+
|
|
|
+ for (Integer i: componentIndexesExpected) {
|
|
|
expectedComponentHosts.add(hostList.get(i));
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
Set<String> actualSlavesHosts = new HashSet<String>();
|
|
|
-
|
|
|
- for (Integer i: componentIndexesActual)
|
|
|
+
|
|
|
+ for (Integer i: componentIndexesActual) {
|
|
|
actualSlavesHosts.add(new ArrayList<String>(info.get(HOSTS_LIST)).get(i));
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
assertEquals(expectedComponentHosts, actualSlavesHosts);
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private Set<Integer> getDecompressedSet(Set<String> set) {
|
|
@@ -379,7 +390,7 @@ public class TestStageUtils {
|
|
|
}
|
|
|
return resultSet;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private List<Integer> getRangeMappedDecompressedSet(Set<String> compressedSet) {
|
|
|
|
|
|
SortedMap<Integer, Integer> resultMap = new TreeMap<Integer, Integer>();
|
|
@@ -388,9 +399,10 @@ public class TestStageUtils {
|
|
|
|
|
|
String[] split = token.split(":");
|
|
|
|
|
|
- if (split.length != 2)
|
|
|
+ if (split.length != 2) {
|
|
|
throw new RuntimeException("Broken data, expected format - m:r, got - "
|
|
|
+ token);
|
|
|
+ }
|
|
|
|
|
|
Integer index = Integer.valueOf(split[0]);
|
|
|
|
|
@@ -401,8 +413,9 @@ public class TestStageUtils {
|
|
|
|
|
|
Set<Integer> decompressedSet = getDecompressedSet(rangeTokensSet);
|
|
|
|
|
|
- for (Integer i : decompressedSet)
|
|
|
+ for (Integer i : decompressedSet) {
|
|
|
resultMap.put(i, index);
|
|
|
+ }
|
|
|
|
|
|
}
|
|
|
|
|
@@ -411,7 +424,7 @@ public class TestStageUtils {
|
|
|
return resultList;
|
|
|
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private List<Integer> getReindexedList(List<Integer> list,
|
|
|
List<String> currentIndexes, List<String> desiredIndexes) {
|
|
|
|