|
@@ -46,8 +46,6 @@ public class TestReplication extends TestCase {
|
|
|
private static final Log LOG = LogFactory.getLog(
|
|
|
"org.apache.hadoop.dfs.TestReplication");
|
|
|
|
|
|
-
|
|
|
-
|
|
|
private void writeFile(FileSystem fileSys, Path name, int repl)
|
|
|
throws IOException {
|
|
|
// create and write a file that contains three blocks of data
|
|
@@ -70,13 +68,31 @@ public class TestReplication extends TestCase {
|
|
|
ClientProtocol.versionID,
|
|
|
DataNode.createSocketAddr(conf.get("fs.default.name")),
|
|
|
conf);
|
|
|
-
|
|
|
- LocatedBlock[] locations = namenode.open(name.toString());
|
|
|
+
|
|
|
+ LocatedBlock[] locations;
|
|
|
+ boolean isReplicationDone;
|
|
|
+ do {
|
|
|
+ locations = namenode.open(name.toString());
|
|
|
+ isReplicationDone = true;
|
|
|
+ for (int idx = 0; idx < locations.length; idx++) {
|
|
|
+ DatanodeInfo[] datanodes = locations[idx].getLocations();
|
|
|
+ if(Math.min(numDatanodes, repl) != datanodes.length) {
|
|
|
+ isReplicationDone=false;
|
|
|
+ LOG.warn("File has "+datanodes.length+" replicas, expecting "
|
|
|
+ +Math.min(numDatanodes, repl));
|
|
|
+ try {
|
|
|
+ Thread.sleep(15000L);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // nothing
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } while(!isReplicationDone);
|
|
|
+
|
|
|
boolean isOnSameRack = true, isNotOnSameRack = true;
|
|
|
for (int idx = 0; idx < locations.length; idx++) {
|
|
|
DatanodeInfo[] datanodes = locations[idx].getLocations();
|
|
|
- assertEquals("Number of replicas for block" + idx,
|
|
|
- Math.min(numDatanodes, repl), datanodes.length);
|
|
|
if(datanodes.length <= 1) break;
|
|
|
if(datanodes.length == 2) {
|
|
|
isNotOnSameRack = !( datanodes[0].getNetworkLocation().equals(
|
|
@@ -114,16 +130,12 @@ public class TestReplication extends TestCase {
|
|
|
*/
|
|
|
public void testReplication() throws IOException {
|
|
|
Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean("dfs.replication.considerLoad", false);
|
|
|
MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, true, racks);
|
|
|
- // Now wait for 15 seconds to give datanodes chance to register
|
|
|
- // themselves and to report heartbeat
|
|
|
- try {
|
|
|
- Thread.sleep(15000L);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // nothing
|
|
|
- }
|
|
|
+ cluster.waitActive();
|
|
|
|
|
|
- InetSocketAddress addr = new InetSocketAddress("localhost", 65312);
|
|
|
+ InetSocketAddress addr = new InetSocketAddress("localhost",
|
|
|
+ cluster.getNameNodePort());
|
|
|
DFSClient client = new DFSClient(addr, conf);
|
|
|
|
|
|
DatanodeInfo[] info = client.datanodeReport();
|