Explorar o código

HDFS-1719. Federation: Fix TestDFSRemove that fails intermittently. Contributed by Suresh Srinivas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078076 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas %!s(int64=14) %!d(string=hai) anos
pai
achega
7b2a17ef47

+ 3 - 0
CHANGES.txt

@@ -200,6 +200,9 @@ Trunk (unreleased changes)
     HDFS-1718. Federation: MiniDFSCluster#waitActive() bug causes some tests
     to fail. (suresh)
 
+    HDFS-1719. Federation: Fix TestDFSRemove that fails intermittently.
+    (suresh)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 55 - 61
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -325,7 +325,6 @@ public class DataNode extends Configured
   volatile boolean shouldRun = true;
   private BlockPoolManager blockPoolManager;
   public volatile FSDatasetInterface data = null;
-  private DatanodeID datanodeId = null;
   private String clusterId = null;
 
   public final static String EMPTY_DEL_HINT = "";
@@ -340,8 +339,10 @@ public class DataNode extends Configured
   private HttpServer infoServer = null;
   DataNodeMetrics myMetrics;
   private InetSocketAddress selfAddr;
-  static DataNode datanodeObject = null;
-  String machineName;
+  
+  private static volatile DataNode datanodeObject = null;
+  private final String hostName; // Host name of this datanode
+  
   private static String dnThreadName;
   int socketTimeout;
   int socketWriteTimeout = 0;  
@@ -386,6 +387,7 @@ public class DataNode extends Configured
     DataNode.setDataNode(this);
     
     try {
+      hostName = getHostName(conf);
       startDataNode(conf, dataDirs, resources);
     } catch (IOException ie) {
       shutdown();
@@ -402,17 +404,21 @@ public class DataNode extends Configured
     clusterId = cid;    
   }
 
-  private void initConfig(Configuration conf) throws UnknownHostException {
+  private static String getHostName(Configuration config)
+      throws UnknownHostException {
+    String name = null;
     // use configured nameserver & interface to get local hostname
-    if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
-      machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);   
+    if (config.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
+      name = config.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
     }
-    if (machineName == null) {
-      machineName = DNS.getDefaultHost(
-                                     conf.get("dfs.datanode.dns.interface","default"),
-                                     conf.get("dfs.datanode.dns.nameserver","default"));
+    if (name == null) {
+      name = DNS.getDefaultHost(config.get("dfs.datanode.dns.interface",
+          "default"), config.get("dfs.datanode.dns.nameserver", "default"));
     }
+    return name;
+  }
 
+  private void initConfig(Configuration conf) {
     this.socketTimeout =  conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
                                       HdfsConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -472,8 +478,6 @@ public class DataNode extends Configured
     this.infoServer.addServlet(null, "/blockScannerReport", 
                                DataBlockScanner.Servlet.class);
     this.infoServer.start();
-    // adjust info port
-    this.datanodeId.setInfoPort(this.infoServer.getPort());
   }
   
   private void startPlugins(Configuration conf) {
@@ -501,9 +505,6 @@ public class DataNode extends Configured
         CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
       ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
     }
-
-    datanodeId.setIpcPort(ipcServer.getListenerAddress().getPort());
-    LOG.info("datanodeId = " + datanodeId);
   }
   
 /**
@@ -585,10 +586,7 @@ public class DataNode extends Configured
   }
   
   private void initDataXceiver(Configuration conf) throws IOException {
-    // construct registration
     InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
-    int tmpPort = socAddr.getPort();
-    this.datanodeId = new DatanodeID(machineName + ":" + tmpPort);
 
     // find free port or use privileged port provided
     ServerSocket ss;
@@ -601,10 +599,9 @@ public class DataNode extends Configured
     }
     ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
     // adjust machine name with the actual port
-    tmpPort = ss.getLocalPort();
+    int tmpPort = ss.getLocalPort();
     selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
                                      tmpPort);
-    this.datanodeId.setName(machineName + ":" + tmpPort);
     LOG.info("Opened info server at " + tmpPort);
       
     this.threadGroup = new ThreadGroup("dataXceiverServer");
@@ -659,7 +656,9 @@ public class DataNode extends Configured
     UpgradeManagerDatanode upgradeManager = null;
 
     BPOfferService(InetSocketAddress isa) {
-      this.bpRegistration = new DatanodeRegistration(datanodeId);
+      this.bpRegistration = new DatanodeRegistration(getMachineName());
+      bpRegistration.setInfoPort(infoServer.getPort());
+      bpRegistration.setIpcPort(getIpcPort());
       this.nnAddr = isa;
     }
 
@@ -769,8 +768,7 @@ public class DataNode extends Configured
       
       if (simulatedFSDataset) {
         initFsDataSet(conf, dataDirs);
-
-        bpRegistration.setStorageID(datanodeId.getStorageID()); //same as DN
+        bpRegistration.setStorageID(getStorageId()); //same as DN
         bpRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
         bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
         bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
@@ -781,7 +779,7 @@ public class DataNode extends Configured
             + blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
             + bpNSInfo);
 
-        bpRegistration.setStorageID(storage.getStorageID());
+        bpRegistration.setStorageID(getStorageId());
         bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
         initFsDataSet(conf, dataDirs);
       }
@@ -1086,14 +1084,9 @@ public class DataNode extends Configured
       while(shouldRun && shouldServiceRun) {
         try {
           // reset name to machineName. Mainly for web interface. Same for all DB
-          bpRegistration.name = machineName + ":" + bpRegistration.getPort();
-          
+          bpRegistration.name = hostName + ":" + bpRegistration.getPort();
           bpRegistration = bpNamenode.registerDatanode(bpRegistration);
 
-          //datanode's machine name may be different from the Namenode's side, 
-          // so we need to reset it to the one received from the registration 
-          setMachineName(bpRegistration);
-
           LOG.info("bpReg after =" + bpRegistration.storageInfo + 
               ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
 
@@ -1193,8 +1186,10 @@ public class DataNode extends Configured
             }
           }
         }
+      } catch (Throwable ex) {
+        LOG.warn("Unexpected exception ", ex);
       } finally {
-        LOG.warn(bpRegistration + ":ending block pool service for: " 
+        LOG.warn(bpRegistration + " ending block pool service for: " 
             + blockPoolId);
         cleanUp();
       }
@@ -1353,7 +1348,7 @@ public class DataNode extends Configured
     startInfoServer(conf);
     initIpcServer(conf);
 
-    myMetrics = new DataNodeMetrics(conf, datanodeId.getName());
+    myMetrics = new DataNodeMetrics(conf, getMachineName());
 
     blockPoolManager = new BlockPoolManager(conf);
   }
@@ -1377,11 +1372,10 @@ public class DataNode extends Configured
       conf.getBoolean("dfs.datanode.simulateddatastorage", false);
 
     if (simulatedFSDataset) {
-      setNewStorageID(datanodeId);
+      storage.createStorageID();
       // it would have been better to pass storage as a parameter to
       // constructor below - need to augment ReflectionUtils used below.
-      conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, datanodeId
-          .getStorageID());
+      conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, getStorageId());
       try {
         data = (FSDatasetInterface) ReflectionUtils.newInstance(
             Class.forName(
@@ -1414,29 +1408,25 @@ public class DataNode extends Configured
     }
   }
   
-  String getStorageId() {
-    return datanodeId.getStorageID();
+  int getPort() {
+    return selfAddr.getPort();
   }
   
-  public String getMachineName() {
-    return datanodeId.name;
+  String getStorageId() {
+    return storage.getStorageID();
   }
   
-  private void setMachineName(DatanodeRegistration bpReg) {
-    String [] mNames = bpReg.getName().split(":");
-    synchronized (datanodeId) {
-      datanodeId.name = mNames[0] + ":" + datanodeId.getPort();
-    }
-
-
+  /** 
+   * Get host:port with host set to Datanode host and port set to the
+   * port {@link DataXceiver} is serving.
+   * @return host:port string
+   */
+  public String getMachineName() {
+    return hostName + ":" + getPort();
   }
   
   public int getIpcPort() {
-    return datanodeId.ipcPort;
-  }
-  
-  DatanodeID getDatanodeId() {
-    return datanodeId;
+    return ipcServer.getListenerAddress().getPort();
   }
   
   /**
@@ -1533,6 +1523,15 @@ public class DataNode extends Configured
   }
   
   public static void setNewStorageID(DatanodeID dnId) {
+    LOG.info("Datanode is " + dnId);
+    dnId.storageID = createNewStorageId(dnId.getPort());
+  }
+  
+  static String createNewStorageId() {
+    return createNewStorageId(datanodeObject.getPort());
+  }
+  
+  private static String createNewStorageId(int port) {
     /* Return 
      * "DS-randInt-ipaddr-currentTimeMillis"
      * It is considered extermely rare for all these numbers to match
@@ -1559,8 +1558,8 @@ public class DataNode extends Configured
       LOG.warn("Could not use SecureRandom");
       rand = R.nextInt(Integer.MAX_VALUE);
     }
-    dnId.storageID = "DS-" + rand + "-"+ ip + "-" + dnId.getPort() + "-" + 
-                      System.currentTimeMillis();
+    return "DS-" + rand + "-" + ip + "-" + port + "-"
+        + System.currentTimeMillis();
   }
 
   /**
@@ -2115,14 +2114,9 @@ public class DataNode extends Configured
 
   @Override
   public String toString() {
-    return "DataNode{" +
-    "data=" + data +
-    (datanodeId != null ?
-        (", localName='" + datanodeId.getName() + "'" +
-            ", storageID='" + datanodeId.getStorageID() + "'")
-            : "") +
-            ", xmitsInProgress=" + xmitsInProgress.get() +
-            "}";
+    return "DataNode{data=" + data + ", localName='" + getMachineName()
+        + "', storageID='" + getStorageId() + "', xmitsInProgress="
+        + xmitsInProgress.get() + "}";
   }
 
   private static void printUsage() {

+ 16 - 11
src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -72,6 +72,7 @@ public class DataStorage extends Storage {
   private static final Pattern PRE_GENSTAMP_META_FILE_PATTERN = 
     Pattern.compile("(.*blk_[-]*\\d+)\\.meta$");
   
+  /** Access to this variable is guarded by "this" */
   private String storageID;
 
   // flag to ensure initialzing storage occurs only once
@@ -96,14 +97,21 @@ public class DataStorage extends Storage {
     this.storageID = strgID;
   }
 
-  public String getStorageID() {
+  synchronized String getStorageID() {
     return storageID;
   }
   
-  void setStorageID(String newStorageID) {
+  synchronized void setStorageID(String newStorageID) {
     this.storageID = newStorageID;
   }
   
+  synchronized void createStorageID() {
+    if (storageID != null && !storageID.isEmpty()) {
+      return;
+    }
+    storageID = DataNode.createNewStorageId();
+  }
+  
   /**
    * Analyze storage directories.
    * Recover from previous transitions if required. 
@@ -132,7 +140,6 @@ public class DataStorage extends Storage {
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
     // Format and recover.
-    this.storageID = "";
     this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
     ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
     for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
@@ -184,10 +191,7 @@ public class DataStorage extends Storage {
     }
     
     // make sure we have storage id set - if not - generate new one
-    if(storageID.isEmpty()) {
-      DataNode.setNewStorageID(DataNode.datanodeObject.getDatanodeId());
-      storageID = DataNode.datanodeObject.getStorageId();
-    }
+    createStorageID();
     
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
@@ -277,7 +281,7 @@ public class DataStorage extends Storage {
     props.setProperty("clusterID", clusterID);
     props.setProperty("cTime", String.valueOf(cTime));
     props.setProperty("layoutVersion", String.valueOf(layoutVersion));
-    props.setProperty("storageID", storageID);
+    props.setProperty("storageID", getStorageID());
     // Set NamespaceID in version LAST_PRE_FEDERATION_LAYOUT_VERSION or before
     if (layoutVersion >= LAST_PRE_FEDERATION_LAYOUT_VERSION) {
       props.setProperty("namespaceID", String.valueOf(namespaceID));
@@ -307,13 +311,14 @@ public class DataStorage extends Storage {
       throw new InconsistentFSStateException(sd.getRoot(), "file "
           + STORAGE_FILE_VERSION + " is invalid.");
     }
-    if (!(storageID.equals("") || ssid.equals("") || storageID.equals(ssid))) {
+    String sid = getStorageID();
+    if (!(sid.equals("") || ssid.equals("") || sid.equals(ssid))) {
       throw new InconsistentFSStateException(sd.getRoot(),
           "has incompatible storage Id.");
     }
     
-    if (storageID.equals("")) { // update id only if it was empty
-      storageID = ssid;
+    if (sid.equals("")) { // update id only if it was empty
+      setStorageID(ssid);
     }
   }
 

+ 0 - 9
src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java

@@ -59,15 +59,6 @@ implements Writable, NodeRegistration {
     this("");
   }
   
-  /**
-   * Copy constructor
-   */
-  public DatanodeRegistration(DatanodeID from) {
-    super(from);
-    this.storageInfo = new StorageInfo();
-    this.exportedKeys = new ExportedBlockKeys();
-  }
-  
   /**
    * Create DatanodeRegistration
    */

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1392,8 +1392,8 @@ public class MiniDFSCluster {
     for (DataNodeProperties dn : dataNodes) {
       // the datanode thread communicating with the namenode should be alive
       if (!dn.datanode.isBPServiceAlive(addr)) {
-        LOG.warn("One or more BPOfferService failed to start in datanode " + dn
-            + " for namenode" + addr);
+        LOG.warn("BPOfferService failed to start in datanode " + dn.datanode
+            + " for namenode at " + addr);
         return false;
       }
     }

+ 0 - 4
src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeRegistration.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs;
 
 import java.net.InetSocketAddress;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -39,11 +38,8 @@ public class TestDatanodeRegistration extends TestCase {
   public void testChangeIpcPort() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
-    FileSystem fs = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).build();
-      fs = cluster.getFileSystem();
-
       InetSocketAddress addr = new InetSocketAddress(
         "localhost",
         cluster.getNameNodePort());