Kaynağa Gözat

HADOOP-3677. Simplify generation stamp upgrade by making is a
local upgrade on datandodes. Deleted distributed upgrade. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@677665 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 yıl önce
ebeveyn
işleme
104870a892

+ 4 - 0
CHANGES.txt

@@ -307,6 +307,10 @@ Release 0.18.0 - Unreleased
     in hadoop user guide. (shv)
 
   IMPROVEMENTS
+
+    HADOOP-3677. Simplify generation stamp upgrade by making is a 
+    local upgrade on datandodes. Deleted distributed upgrade.
+    (rangadi)
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().
     (Lohit Vjayarenu via rangadi)

+ 0 - 70
src/hdfs/org/apache/hadoop/hdfs/server/common/GenerationStampStatsUpgradeCommand.java

@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.common;
-
-import java.io.*;
-
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.server.namenode.GenerationStampUpgradeNamenode;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-
-/**
- * The Datanode sends this statistics object to the Namenode periodically
- * during a Generation Stamp Upgrade.
- */
-public class GenerationStampStatsUpgradeCommand extends UpgradeCommand {
-  public DatanodeID datanodeId;
-  public int blocksUpgraded;
-  public int blocksRemaining;
-  public int errors;
-
-  GenerationStampStatsUpgradeCommand() {
-    super(GenerationStampUpgradeNamenode.DN_CMD_STATS, 0, (short)0);
-    datanodeId = new DatanodeID();
-  }
-
-  public GenerationStampStatsUpgradeCommand(short status, DatanodeID dn,
-                              int blocksUpgraded, int blocksRemaining,
-                              int errors, int version) {
-    super(GenerationStampUpgradeNamenode.DN_CMD_STATS, version, status);
-    //copy so that only ID part gets serialized
-    datanodeId = new DatanodeID(dn); 
-    this.blocksUpgraded = blocksUpgraded;
-    this.blocksRemaining = blocksRemaining;
-    this.errors = errors;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    datanodeId.readFields(in);
-    blocksUpgraded = in.readInt();
-    blocksRemaining = in.readInt();
-    errors = in.readInt();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    datanodeId.write(out);
-    out.writeInt(blocksUpgraded);
-    out.writeInt(blocksRemaining);
-    out.writeInt(errors);
-  }
-}
-

+ 4 - 0
src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -69,6 +69,10 @@ public abstract class Storage extends StorageInfo {
   // this corresponds to Hadoop-0.14.
   public static final int LAST_UPGRADABLE_LAYOUT_VERSION = -7;
   protected static final String LAST_UPGRADABLE_HADOOP_VERSION = "Hadoop-0.14";
+
+  /* this should be removed when LAST_UPGRADABLE_LV goes beyond -13.
+   * any upgrade code that uses this constant should also be removed. */
+  public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
   
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";

+ 0 - 4
src/hdfs/org/apache/hadoop/hdfs/server/common/UpgradeObjectCollection.java

@@ -22,8 +22,6 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.server.datanode.GenerationStampUpgradeDatanode;
-import org.apache.hadoop.hdfs.server.namenode.GenerationStampUpgradeNamenode;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -36,8 +34,6 @@ public class UpgradeObjectCollection {
     initialize();
     // Registered distributed upgrade objects here
     // registerUpgrade(new UpgradeObject());
-    registerUpgrade(new GenerationStampUpgradeNamenode());
-    registerUpgrade(new GenerationStampUpgradeDatanode());
   }
 
   static class UOSignature implements Comparable<UOSignature> {

+ 32 - 3
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -28,7 +28,10 @@ import java.util.Collection;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.NodeType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.StartupOption;
@@ -275,7 +278,7 @@ public class DataStorage extends Storage {
     // rename current to tmp
     rename(curDir, tmpDir);
     // hardlink blocks
-    linkBlocks(tmpDir, curDir);
+    linkBlocks(tmpDir, curDir, this.getLayoutVersion());
     // write version file
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     assert this.namespaceID == nsInfo.getNamespaceID() :
@@ -357,12 +360,19 @@ public class DataStorage extends Storage {
     }
   }
   
-  static void linkBlocks(File from, File to) throws IOException {
+  static void linkBlocks(File from, File to, int oldLV) throws IOException {
     if (!from.isDirectory()) {
       if (from.getName().startsWith(COPY_FILE_PREFIX)) {
         IOUtils.copyBytes(new FileInputStream(from), 
                           new FileOutputStream(to), 16*1024, true);
       } else {
+        
+        //check if we are upgrading from pre-generation stamp version.
+        if (oldLV >= PRE_GENERATIONSTAMP_LAYOUT_VERSION) {
+          // Link to the new file name.
+          to = new File(convertMetatadataFileName(to.getAbsolutePath()));
+        }
+        
         HardLink.createHardLink(from, to);
       }
       return;
@@ -379,7 +389,8 @@ public class DataStorage extends Storage {
       });
     
     for(int i = 0; i < blockNames.length; i++)
-      linkBlocks(new File(from, blockNames[i]), new File(to, blockNames[i]));
+      linkBlocks(new File(from, blockNames[i]), 
+                 new File(to, blockNames[i]), oldLV);
   }
 
   protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
@@ -406,4 +417,22 @@ public class DataStorage extends Storage {
     um.setUpgradeState(false, getLayoutVersion());
     um.initializeUpgrade(nsInfo);
   }
+  
+  private static final Pattern PRE_GENSTAMP_META_FILE_PATTERN = 
+    Pattern.compile("(.*blk_[-]*\\d+)\\.meta$");
+  /**
+   * This is invoked on target file names when upgrading from pre generation 
+   * stamp version (version -13) to correct the metatadata file name.
+   * @param oldFileName
+   * @return the new metadata file name with the default generation stamp.
+   */
+  private static String convertMetatadataFileName(String oldFileName) {
+    Matcher matcher = PRE_GENSTAMP_META_FILE_PATTERN.matcher(oldFileName); 
+    if (matcher.matches()) {
+      //return the current metadata file name
+      return FSDataset.getMetaFileName(matcher.group(1),
+                                       Block.GRANDFATHER_GENERATION_STAMP); 
+    }
+    return oldFileName;
+  }
 }

+ 6 - 2
src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -564,9 +564,13 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
   } 
   
+  static String getMetaFileName(String blockFileName, long genStamp) {
+    return blockFileName + "_" + genStamp + METADATA_EXTENSION;
+  }
+  
   static File getMetaFile(File f , Block b) {
-    return new File( f.getAbsolutePath() +
-                     "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
+    return new File(getMetaFileName(f.getAbsolutePath(),
+                                    b.getGenerationStamp())); 
   }
   protected File getMetaFile(Block b) throws IOException {
     return getMetaFile(getBlockFile(b), b);

+ 0 - 453
src/hdfs/org/apache/hadoop/hdfs/server/datanode/GenerationStampUpgradeDatanode.java

@@ -1,453 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.GenerationStampStatsUpgradeCommand;
-import org.apache.hadoop.hdfs.server.namenode.GenerationStampUpgradeNamenode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-
-/**
- * This class associates a block generation stamp with with block. This
- * generation stamp is written to each metadata file. Please see
- * HADOOP-1700 for details.
- */
-public class GenerationStampUpgradeDatanode extends UpgradeObjectDatanode {
-
-  public static final Log LOG = 
-    LogFactory.getLog(GenerationStampUpgradeDatanode.class.getName());
-
-  DatanodeProtocol namenode;
-  InetSocketAddress namenodeAddr;
-
-  // stats
-  private AtomicInteger blocksPreviouslyUpgraded = new AtomicInteger(0);
-  private AtomicInteger blocksToUpgrade = new AtomicInteger(0);
-  private AtomicInteger blocksUpgraded = new AtomicInteger(0);
-  private AtomicInteger errors = new AtomicInteger(0);
-
-  // process the upgrade using a pool of threads.
-  static private final int poolSize = 4;
-
-  // If no progress has occured during this time, print warnings message.
-  static private final int LONG_TIMEOUT_MILLISEC = 1*60*1000; // 1 minute
-
-  // This object is needed to indicate that namenode is not running upgrade.
-  static UpgradeCommand noUpgradeOnNamenode = new UpgradeCommand();
-
-  private List<UpgradeExecutor> completedList = new LinkedList<UpgradeExecutor>();
-
-  /* This is set when the datanode misses the regular upgrade.
-   * When this is set, it upgrades the block but stops heartbeating
-   * to the namenode.
-   */
-  private AtomicBoolean offlineUpgrade = new AtomicBoolean(false);
-  private AtomicBoolean upgradeCompleted = new AtomicBoolean(false);
-  
-  // Implement the common interfaces required by UpgradeObjectDatanode
-  
-  public int getVersion() {
-    return GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION;
-  }
-
-  /*
-   * Start upgrade if it not already running. It sends status to
-   * namenode even if an upgrade is already in progress.
-   */
-  public synchronized UpgradeCommand startUpgrade() throws IOException {
-    if (offlineUpgrade.get()) {
-      doUpgrade();
-    }
-    return null; 
-  }
-
-  public String getDescription() {
-    return "Block Generation Stamp Upgrade at Datanode";
-  }
-
-  public short getUpgradeStatus() {
-    return (blocksToUpgrade.get() == blocksUpgraded.get()) ? 100 :
-      (short) Math.floor(blocksUpgraded.get()*100.0/blocksToUpgrade.get());
-  }
-
-  public UpgradeCommand completeUpgrade() throws IOException {
-    // return latest stats command.
-    assert getUpgradeStatus() == 100;
-    return new GenerationStampStatsUpgradeCommand(getUpgradeStatus(),
-                                    getDatanode().dnRegistration,
-                                    blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
-                                    blocksToUpgrade.get()-blocksUpgraded.get(),
-                                    errors.get(),
-                                    GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
-  }
-  
-  @Override
-  boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
-    int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
-    if(nsUpgradeVersion >= getVersion()) {
-      return false; // Normal upgrade.
-    }
-    
-    LOG.info("\n  This Datanode has missed a cluster wide Block generation Stamp Upgrade." +
-             "\n  Will perform an 'offline' upgrade of the blocks." +
-             "\n  During this time, Datanode does not heartbeat.");
-    
-    
-    // Namenode removes this node from the registered nodes
-    try {
-      getDatanode().namenode.errorReport(getDatanode().dnRegistration,
-                                    DatanodeProtocol.NOTIFY, 
-                                    "Performing an offline generation stamp " +
-                                    "upgrade. " +
-                                    "Will be back online once the ugprade " +
-                                    "completes. Please see datanode logs.");
-      
-    } catch(IOException ignored) {
-      LOG.info("\n  This Datanode was unable to send error report to namenode.");
-    }
-    offlineUpgrade.set(true);
-    return true;
-  }
-
-  public GenerationStampUpgradeDatanode() {
-    blocksPreviouslyUpgraded.set(0);
-    blocksToUpgrade.set(0);
-    blocksUpgraded.set(0);
-    errors.set(0);
-  }
-
-  static File getPreGenerationMetaFile(File f) {
-    return new File(f.getAbsolutePath() + FSDataset.METADATA_EXTENSION);
-  }
-  
-  // This class is invoked by the worker thread to convert the
-  // metafile into the new format
-  //
-  class UpgradeExecutor implements Runnable {
-    Block block;
-    Throwable throwable;
-    
-    UpgradeExecutor(Block b) {
-      block = b;
-    }
-
-    public void run() {
-      try {
-        // do the real work here
-        FSDataset dataset = (FSDataset) getDatanode().data;
-        upgradeToCurVersion(dataset, block);
-      } catch (Throwable t) {
-        throwable = t;
-      }
-      synchronized (completedList) {
-        completedList.add(this);
-        completedList.notify();
-      }
-    }
-
-    /**
-     * Upgrades the metadata file to current version if required.
-     * @param dataset
-     * @param block
-     */
-    void upgradeToCurVersion(FSDataset dataset, Block block)
-                                              throws IOException {
-      File blockFile = dataset.getBlockFile(block);
-      if (blockFile == null) {
-        throw new IOException("Could find file for " + block);
-      }
-
-      File metadataFile = dataset.getMetaFile(block);
-      File oldmetadataFile = getPreGenerationMetaFile(blockFile);
-
-      if (metadataFile.exists() && oldmetadataFile.exists()) {
-        //
-        // If both file exists and are of the same size,
-        // then delete the old one. If the sizes are not same then
-        // leave both of them and consider the upgrade as successful.
-        //
-        if (metadataFile.length() == oldmetadataFile.length()) {
-          if (!oldmetadataFile.delete()) {
-            LOG.info("Unable to delete old metadata file " + oldmetadataFile);
-          }
-        }
-      } else if (metadataFile.exists()) {
-        //
-        // Only the new file exists, nothing more to do.
-        //
-        return;
-      } else if (oldmetadataFile.exists()) {
-        //
-        // The old file exists but the new one is missing. Rename
-        // old one to new name.
-        //
-        if (!oldmetadataFile.renameTo(metadataFile)) {
-          throw new IOException("Could find rename " +  oldmetadataFile +
-                                " to " + metadataFile);
-        }
-      } else {
-        throw new IOException("Could find any metadata file for " + block);
-      }
-    }
-  }
-  
-  // This method iterates through all the blocks on a datanode and
-  // do the upgrade.
-  //
-  public void doUpgrade() throws IOException {
-    
-    if (upgradeCompleted.get()) {
-      assert offlineUpgrade.get() : 
-             ("Multiple calls to doUpgrade is expected only during " +
-              "offline upgrade");
-      return;
-    }
-    
-    FSDataset dataset = (FSDataset) getDatanode().data;
-
-    // Set up the retry policy so that each attempt waits for one minute.
-    Configuration conf = new Configuration();
-    // set rpc timeout to one minute.
-    conf.set("ipc.client.timeout", "60000");
-
-    RetryPolicy timeoutPolicy =
-       RetryPolicies.retryUpToMaximumCountWithFixedSleep(
-               LONG_TIMEOUT_MILLISEC/1000,
-               1, TimeUnit.MILLISECONDS);
-
-    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
-      new HashMap<Class<? extends Exception>, RetryPolicy>();
-    exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
-    Map<String,RetryPolicy> methodNameToPolicyMap =
-                            new HashMap<String, RetryPolicy>();
-    // do we need to set the policy for connection failures also?
-    methodNameToPolicyMap.put("processUpgradeCommand", methodPolicy);
-
-    LOG.info("Starting Block Generation Stamp Upgrade on datanode " +
-             getDatanode());
-
-    for (;;) {
-      try {
-        namenodeAddr = getDatanode().getNameNodeAddr();
-        namenode = (DatanodeProtocol) RetryProxy.create(
-                            DatanodeProtocol.class,
-                            RPC.waitForProxy(DatanodeProtocol.class,
-                                             DatanodeProtocol.versionID,
-                                             namenodeAddr,
-                                             conf),
-                            methodNameToPolicyMap);
-        break;
-      } catch (IOException e) {
-        LOG.warn("Generation Stamp Upgrade Exception " +
-                 "while trying to connect to NameNode at " +
-                 getDatanode().getNameNodeAddr().toString() + " : " +
-                 StringUtils.stringifyException(e));
-        try {
-          Thread.sleep(10*1000);
-        } catch (InterruptedException e1) {
-          throw new IOException("Interrupted Sleep while creating RPC proxy." +
-                                e1);
-        }
-      }
-    }
-    LOG.info("Block Generation Stamp Upgrade Datanode connected to " +
-             "namenode at " + namenodeAddr);
-
-    // Get a list of all the blocks :
-    LinkedList<UpgradeExecutor> blockList = new LinkedList<UpgradeExecutor>();
-    
-    //Fill blockList with blocks to be upgraded.
-    Block [] blockArr = dataset.getBlockReport();
-    
-    for (Block b : blockArr) {
-      File blockFile = null;
-      try {
-        blockFile = dataset.getBlockFile(b);
-      } catch (IOException e) {
-        //The block might just be deleted. ignore it.
-        LOG.warn("Could not find file location for " + b + 
-                 ". It might already be deleted. Exception : " +
-                 StringUtils.stringifyException(e));
-        errors.getAndIncrement();
-        continue;
-      }
-      if (!blockFile.exists()) {
-        errors.getAndIncrement();
-        LOG.error("could not find block file " + blockFile);
-        continue;
-      }
-      File metaFile = dataset.getMetaFile(b);
-      File oldMetaFile = getPreGenerationMetaFile(blockFile);
-      if (metaFile.exists()) {
-        blocksPreviouslyUpgraded.getAndIncrement();
-        continue;
-      }
-      blocksToUpgrade.getAndIncrement();
-      blockList.add(new UpgradeExecutor(b));
-    }
-    blockArr = null;
-    int nLeft = blockList.size();
-    
-    LOG.info("Starting upgrade of " + blocksToUpgrade.get() + " blocks out of " +
-             (blocksToUpgrade.get() + blocksPreviouslyUpgraded.get()));
-
-    // Start the pool of upgrade workers
-    ExecutorService pool = Executors.newFixedThreadPool(poolSize);
-    for (Iterator<UpgradeExecutor> it = blockList.iterator(); it.hasNext();) {
-      pool.submit(it.next());
-    }
-
-    // Inform the namenode
-    sendStatus();
-    
-    // Report status to namenode every so many seconds:
-    long now = System.currentTimeMillis();
-    long statusReportIntervalMilliSec = 30*1000;
-    long lastStatusReportTime = now;
-    long lastUpdateTime = now;
-    long lastWarnTime = now;
-    
-    // Now wait for the tasks to complete.
-    //
-    while (nLeft > 0) {
-      synchronized (completedList) {
-        if (completedList.size() <= 0) {
-          try {
-            completedList.wait(1000);
-          } catch (InterruptedException ignored) {}
-        }
-        
-        now = System.currentTimeMillis();
-        
-        if (completedList.size()> 0) {
-          UpgradeExecutor exe = completedList.remove(0);
-          nLeft--;
-          if (exe.throwable != null) {
-            errors.getAndIncrement();
-            LOG.error("Got an exception during generation stamp upgrade of " +
-                      exe.block + ": " + 
-                      StringUtils.stringifyException(exe.throwable));
-          }
-          blocksUpgraded.getAndIncrement();
-          lastUpdateTime = now;
-        } else {
-          if ((now - lastUpdateTime) >= LONG_TIMEOUT_MILLISEC &&
-              (now - lastWarnTime) >= LONG_TIMEOUT_MILLISEC) {
-            lastWarnTime = now;
-            LOG.warn("No block was updated in last " +
-                      (LONG_TIMEOUT_MILLISEC/(60*1000)) +
-                      " minutes! will keep waiting... ");
-          }  
-        } 
-      }
-      
-      if ((now-lastStatusReportTime) > statusReportIntervalMilliSec) {
-        sendStatus();
-        lastStatusReportTime = System.currentTimeMillis();
-      }
-    }
-
-    pool.shutdown();
-    upgradeCompleted.set(true);
-    
-    LOG.info("Completed Block Generation Stamp Upgrade. Total of " + 
-             (blocksPreviouslyUpgraded.get() + blocksToUpgrade.get()) +
-             " blocks : " + blocksPreviouslyUpgraded.get() + " blocks previously " +
-             "upgraded, " + blocksUpgraded.get() + " blocks upgraded this time " +
-             "with " + errors.get() + " errors.");       
-
-    // now inform the name node about the completion.
-    // What if there is no upgrade running on Namenode now?
-    while (!sendStatus());
-    
-  }
-  
-  /** Sends current status and stats to namenode and logs it to local log*/ 
-  boolean sendStatus() {
-    LOG.info((offlineUpgrade.get() ? "Offline " : "") + 
-              "Block Generation Stamp Upgrade : " + 
-               getUpgradeStatus() + "% completed.");
-    if (offlineUpgrade.get()) {
-      return true;
-    }
-    
-    GenerationStampStatsUpgradeCommand cmd = null;
-    synchronized (this) {
-      cmd = new GenerationStampStatsUpgradeCommand(getUpgradeStatus(),
-                           getDatanode().dnRegistration,
-                           blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
-                           blocksToUpgrade.get()-blocksUpgraded.get(),
-                           errors.get(),
-                           GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
-    }
-    UpgradeCommand reply = sendCommand(namenodeAddr, namenode, cmd, 0);
-    if (reply == null) {
-      LOG.warn("Could not send status to Namenode. Namenode might be " +
-               "over loaded or down.");
-    }
-    return reply != null;
-  }
-
-
-  // Sends a command to the namenode
-  static UpgradeCommand sendCommand(InetSocketAddress namenodeAddr,
-                                    DatanodeProtocol namenode,
-                                    UpgradeCommand cmd, int retries) {
-    for(int i=0; i<=retries || retries<0; i++) {
-      try {
-        UpgradeCommand reply = namenode.processUpgradeCommand(cmd);
-        if (reply == null) {
-          /* namenode might not be running upgrade or finished
-           * an upgrade. We just return a static object */
-          return noUpgradeOnNamenode;
-        }
-        return reply;
-      } catch (IOException e) {
-        // print the stack trace only for the last retry.
-        LOG.warn("Exception to " + namenodeAddr +
-                 " while sending command " + 
-                 cmd.getAction() + ": " + e +
-                 ((retries<0 || i>=retries)? "... will retry ..." :
-                   ": " + StringUtils.stringifyException(e)));
-      }
-    }
-    return null;
-  }
-}

+ 0 - 539
src/hdfs/org/apache/hadoop/hdfs/server/namenode/GenerationStampUpgradeNamenode.java

@@ -1,539 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.hdfs.protocol.DFSFileInfo;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.common.GenerationStampStatsUpgradeCommand;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.*;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-
-/**
- * This class associates a block generation stamp with with block. This
- * generation stamp is written to each metadata file. Please see
- * HADOOP-1700 for details.
- */
-/**
- * Once an upgrade starts at the namenode , this class manages the upgrade 
- * process.
- */
-public class GenerationStampUpgradeNamenode extends UpgradeObjectNamenode {
-  
-  public static final Log LOG = 
-    LogFactory.getLog(GenerationStampUpgradeNamenode.class.getName());
-  
-  static final long inactivityExtension = 10*1000; // 10 seconds
-  AtomicLong lastNodeCompletionTime = new AtomicLong(0);
-
-  // The layout version before the generation stamp upgrade.
-  public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
-
-  public static final int DN_CMD_STATS = 300;
-  
-  enum UpgradeStatus {
-    INITIALIZED,
-    STARTED,
-    DATANODES_DONE,
-    COMPLETED,
-  }
-  
-  UpgradeStatus upgradeStatus = UpgradeStatus.INITIALIZED;
-  
-  class DnInfo { 
-    short percentCompleted = 0;
-    long blocksUpgraded = 0;
-    long blocksRemaining = 0;
-    long errors = 0;
-    
-    DnInfo(short pcCompleted) {
-      percentCompleted = status;
-    }
-    DnInfo() {}
-    
-    void setStats(GenerationStampStatsUpgradeCommand cmd) {
-      percentCompleted = cmd.getCurrentStatus();
-      blocksUpgraded = cmd.blocksUpgraded;
-      blocksRemaining = cmd.blocksRemaining;
-      errors = cmd.errors;
-    }
-    
-    boolean isDone() {
-      return percentCompleted >= 100;
-    }
-  }
-  
-  /* We should track only the storageIDs and not DatanodeID, which
-   * includes datanode name and storage id.
-   */
-  HashMap<DatanodeID, DnInfo> dnMap = new HashMap<DatanodeID, DnInfo>();
-  HashMap<DatanodeID, DnInfo> unfinishedDnMap = 
-                                      new HashMap<DatanodeID, DnInfo>();  
-
-  Daemon monitorThread;
-  double avgDatanodeCompletionPct = 0;
-  boolean forceDnCompletion = false;
-  
-  //Upgrade object interface:
-  
-  public int getVersion() {
-    return PRE_GENERATIONSTAMP_LAYOUT_VERSION;
-  }
-
-  public UpgradeCommand completeUpgrade() throws IOException {
-    return null;
-  }
- 
-  @Override
-  public String getDescription() {
-    return "Block Generation Stamp Upgrade at Namenode";
-  }
-
-  @Override
-  public synchronized short getUpgradeStatus() {
-    // Reserve 10% for deleting files.
-    if (upgradeStatus == UpgradeStatus.COMPLETED) {
-      return 100;
-    }   
-    return (short) avgDatanodeCompletionPct;
-  }
-
-  @Override
-  public UpgradeCommand startUpgrade() throws IOException {
-    
-    assert monitorThread == null;
-    lastNodeCompletionTime.set(System.currentTimeMillis());
-    
-    monitorThread = new Daemon(new UpgradeMonitor());
-    monitorThread.start();    
-    return super.startUpgrade();
-  }
-  
-  @Override
-  public synchronized void forceProceed() throws IOException {    
-    if (forceDnCompletion) {
-      LOG.warn("forceProceed is already set for this upgrade. It can take " +
-               "a short while to take affect. Please wait.");
-      return;
-    }
-    
-    LOG.info("got forceProceed request for this upgrade. Datanodes upgrade " +
-             "will be considered done. It can take a few seconds to take " +
-             "effect.");
-    forceDnCompletion = true;
-  }
-
-  @Override
-  public
-  UpgradeCommand processUpgradeCommand(UpgradeCommand command) 
-                                           throws IOException {
-    switch (command.getAction()) {
-
-    case GenerationStampUpgradeNamenode.DN_CMD_STATS :
-      return handleStatsCmd(command);
-
-     default:
-       throw new IOException("Unknown Command for Generation Stamp Upgrade : " +
-                             command.getAction());
-    }
-  }
-
-  @Override
-  public UpgradeStatusReport getUpgradeStatusReport(boolean details) 
-                                                    throws IOException {
-
-    /* If 'details' is true should we update block level status?
-     * It could take multiple minutes
-     * updateBlckLevelStats()?
-     */
-    
-    String replyString = "";
-    
-    short status = 0;
-    
-    synchronized (this) {
-     
-      status = getUpgradeStatus();
-     
-      replyString = String.format(
-      ((monitorThread == null) ? "\tUpgrade has not been started yet.\n" : "")+
-      ((forceDnCompletion) ? "\tForce Proceed is ON\n" : "") +
-      "\tLast Block Level Stats updated at : %tc\n" +
-      "\tLast Block Level Stats : %s\n" +
-      "\tBrief Datanode Status  : %s\n" +
-      "%s",
-      latestBlockLevelStats.updatedAt,
-      latestBlockLevelStats.statusString("\n\t                         "), 
-      printStatus("\n\t                         "), 
-      ((status < 100 && upgradeStatus == UpgradeStatus.DATANODES_DONE) ?
-      "\tNOTE: Upgrade at the Datanodes has finished. Deleteing \".crc\" " +
-      "files\n\tcan take longer than status implies.\n" : "")
-      );
-      
-      if (details) {
-        // list all the known data nodes
-        StringBuilder str = null;
-        Iterator<DatanodeID> keys = dnMap.keySet().iterator();
-        Iterator<DnInfo> values = dnMap.values().iterator();
-        
-        for(; keys.hasNext() && values.hasNext() ;) {
-          DatanodeID dn = keys.next();
-          DnInfo info = values.next();
-          String dnStr = "\t\t" + dn.getName() + "\t : " + 
-                         info.percentCompleted + " % \t" +
-                         info.blocksUpgraded + " u \t" +
-                         info.blocksRemaining + " r \t" +
-                         info.errors + " e\n";
-          if ( str == null ) {
-            str = new StringBuilder(dnStr.length()*
-                                    (dnMap.size() + (dnMap.size()+7)/8));
-          }
-          str.append(dnStr);
-        }
-        
-        replyString += "\n\tDatanode Stats (total: " + dnMap.size() + "): " +
-                       "pct Completion(%) blocks upgraded (u) " +
-                       "blocks remaining (r) errors (e)\n\n" +
-                       (( str == null ) ?
-                        "\t\tThere are no known Datanodes\n" : str);
-      }      
-    }
-    return new GenerationStampUpgradeStatusReport(
-                   PRE_GENERATIONSTAMP_LAYOUT_VERSION,
-                   status, replyString);
-  }
-
-
-  /**
-   * The namenode process a periodic statistics message from the datanode.
-   */
-  private synchronized UpgradeCommand handleStatsCmd(UpgradeCommand cmd) {
-    
-    GenerationStampStatsUpgradeCommand stats = (GenerationStampStatsUpgradeCommand)cmd;
-    
-    DatanodeID dn = stats.datanodeId;
-    DnInfo dnInfo = dnMap.get(dn);
-    boolean alreadyCompleted = (dnInfo != null && dnInfo.isDone());
-    
-    if (dnInfo == null) {
-      dnInfo = new DnInfo();
-      dnMap.put(dn, dnInfo);
-      LOG.info("Upgrade started/resumed at datanode " + dn.getName());  
-    }
-    
-    dnInfo.setStats(stats);
-    if (!dnInfo.isDone()) {
-      unfinishedDnMap.put(dn, dnInfo);
-    }
-    
-    if (dnInfo.isDone() && !alreadyCompleted) {
-      LOG.info("upgrade completed on datanode " + dn.getName());      
-      unfinishedDnMap.remove(dn);
-      if (unfinishedDnMap.size() == 0) {
-        lastNodeCompletionTime.set(System.currentTimeMillis());
-      }
-    }   
-    
-    //Should we send any more info?
-    return new UpgradeCommand();
-  }
-  
-  public GenerationStampUpgradeNamenode() {
-  }
-  
-  // For now we will wait for all the nodes to complete upgrade.
-  synchronized boolean isUpgradeDone() {
-    return upgradeStatus == UpgradeStatus.COMPLETED;    
-  }
-  
-  synchronized String printStatus(String spacing) {
-    //NOTE: iterates on all the datanodes.
-    
-    // Calculate % completion on all the data nodes.
-    long errors = 0;
-    long totalCompletion = 0;
-    for( Iterator<DnInfo> it = dnMap.values().iterator(); it.hasNext(); ) {
-      DnInfo dnInfo = it.next();
-      totalCompletion += dnInfo.percentCompleted;            
-      errors += dnInfo.errors;
-    }
-    
-    avgDatanodeCompletionPct = totalCompletion/(dnMap.size() + 1e-20);
-    
-    String msg = "Avg completion of all Datanodes: " +              
-                 String.format("%.2f%%", avgDatanodeCompletionPct) +
-                 " with " + errors + " errors. " +
-                 ((unfinishedDnMap.size() > 0) ? spacing + 
-                   unfinishedDnMap.size() + " out of " + dnMap.size() +
-                   " nodes are not done." : "");
-                 
-    LOG.info("Generation Stamp Upgrade is " + (isUpgradeDone() ? 
-             "complete. " : "still running. ") + spacing + msg);
-    return msg;
-  }
-  
-  private synchronized void setStatus(UpgradeStatus status) {
-    upgradeStatus = status;
-  }
-
-  /* Checks if upgrade completed based on datanode's status and/or 
-   * if all the blocks are upgraded.
-   */
-  private synchronized UpgradeStatus checkOverallCompletion() {
-    
-    if (upgradeStatus == UpgradeStatus.COMPLETED ||
-        upgradeStatus == UpgradeStatus.DATANODES_DONE) {
-      return upgradeStatus;
-    }
-    
-    if (upgradeStatus != UpgradeStatus.DATANODES_DONE) {
-      boolean datanodesDone =
-        (dnMap.size() > 0 && unfinishedDnMap.size() == 0 &&
-         ( System.currentTimeMillis() - lastNodeCompletionTime.get() ) > 
-        inactivityExtension) || forceDnCompletion ;
-                 
-      if ( datanodesDone ) {
-        LOG.info("Upgrade of DataNode blocks is complete. " +
-                 ((forceDnCompletion) ? "(ForceDnCompletion is on.)" : ""));
-        upgradeStatus = UpgradeStatus.DATANODES_DONE;
-      }
-    }
-    
-    if (upgradeStatus != UpgradeStatus.DATANODES_DONE &&
-        latestBlockLevelStats.updatedAt > 0) {
-      // check if last block report marked all
-      if (latestBlockLevelStats.minimallyReplicatedBlocks == 0 &&
-          latestBlockLevelStats.underReplicatedBlocks == 0) {
-        
-        LOG.info("Marking datanode upgrade complete since all the blocks are " +
-                 "upgraded (even though some datanodes may not have " +
-                 "reported completion. Block level stats :\n\t" +
-                 latestBlockLevelStats.statusString("\n\t"));
-        upgradeStatus = UpgradeStatus.DATANODES_DONE;
-      }
-    }
-    
-    return upgradeStatus;
-  } 
-    
-  /**
-   * This class monitors the upgrade progress and periodically prints 
-   * status message to log.
-   */
-  class UpgradeMonitor implements Runnable {
-    
-    static final long statusReportIntervalMillis = 1*60*1000;
-    static final long blockReportIntervalMillis = 5*60*1000;
-    static final int sleepTimeSec = 5;
-    
-    public void run() {
-      long lastReportTime = System.currentTimeMillis();
-      long lastBlockReportTime = lastReportTime;
-      
-      while ( !isUpgradeDone() ) {
-        UpgradeStatus status = checkOverallCompletion();
-        
-        if (status == UpgradeStatus.DATANODES_DONE) {
-          setStatus(UpgradeStatus.COMPLETED);
-        }
-        
-        long now = System.currentTimeMillis();
-        
-        
-        if (now-lastBlockReportTime >= blockReportIntervalMillis) {
-          updateBlockLevelStats();
-          // Check if all the blocks have been upgraded.
-          lastBlockReportTime = now;
-        }
-        
-        if ((now - lastReportTime) >= statusReportIntervalMillis || 
-            isUpgradeDone()) {
-          printStatus("\n\t");
-          lastReportTime = now;
-        }
-
-        if (isUpgradeDone()) {
-          break;
-        }
-        
-        try {
-          Thread.sleep(sleepTimeSec*1000);
-        } catch (InterruptedException e) {
-          break;
-        }
-      }
-      LOG.info("Leaving the Generation Stamp Upgrade Namenode monitor thread");
-    }
-  }
-  
-  private BlockLevelStats latestBlockLevelStats = new BlockLevelStats();
-  // internal class to hold the stats.
-  private static class BlockLevelStats {
-    long fullyReplicatedBlocks = 0;
-    long minimallyReplicatedBlocks = 0;
-    long underReplicatedBlocks = 0; // includes unReplicatedBlocks
-    long unReplicatedBlocks = 0; // zero replicas upgraded
-    long errors;
-    long updatedAt;
-    
-    String statusString(String spacing) {
-      long totalBlocks = fullyReplicatedBlocks + 
-                         minimallyReplicatedBlocks +
-                         underReplicatedBlocks;
-      double multiplier = 100/(totalBlocks + 1e-20);
-      
-      if (spacing.equals("")) {
-        spacing = ", ";
-      }
-      
-      return String.format(
-                     "Total Blocks : %d" +
-                     "%sFully Upgragraded : %.2f%%" +
-                     "%sMinimally Upgraded : %.2f%%" +
-                     "%sUnder Upgraded : %.2f%% (includes Un-upgraded blocks)" +
-                     "%sUn-upgraded : %.2f%%" + 
-                     "%sErrors : %d", totalBlocks, 
-                     spacing, (fullyReplicatedBlocks * multiplier),
-                     spacing, (minimallyReplicatedBlocks * multiplier),
-                     spacing, (underReplicatedBlocks * multiplier),
-                     spacing, (unReplicatedBlocks * multiplier),
-                     spacing, errors);
-    }
-  }
-  
-  void updateBlockLevelStats(String path, BlockLevelStats stats) {
-    DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
-    
-    for (DFSFileInfo file:fileArr) {
-      if (file.isDir()) {
-        updateBlockLevelStats(file.getPath().toString(), stats);
-      } else {
-        // Get the all the blocks.
-        LocatedBlocks blockLoc = null;
-        try {
-          blockLoc = getFSNamesystem().getBlockLocations(
-              file.getPath().toString(), 0, file.getLen());
-          int numBlocks = blockLoc.locatedBlockCount();
-          for (int i=0; i<numBlocks; i++) {
-            LocatedBlock loc = blockLoc.get(i);
-            DatanodeInfo[] dnArr = loc.getLocations();
-            int numUpgraded = 0;
-            synchronized (this) {
-              for (DatanodeInfo dn:dnArr) {
-                DnInfo dnInfo = dnMap.get(dn);
-                if (dnInfo != null && dnInfo.isDone()) {
-                  numUpgraded++;
-                }
-              }
-            }
-            
-            if (numUpgraded >= file.getReplication()) {
-              stats.fullyReplicatedBlocks++;
-            } else if (numUpgraded >= getFSNamesystem().getMinReplication()) {
-              stats.minimallyReplicatedBlocks++;
-            } else {
-              stats.underReplicatedBlocks++;
-            }
-            if (numUpgraded == 0) {
-              stats.unReplicatedBlocks++;
-            }
-          }
-        } catch (IOException e) {
-          LOG.error("BlockGenerationStampUpgrade: could not get block locations for " +
-                    file.getPath().toString() + " : " +
-                    StringUtils.stringifyException(e));
-          stats.errors++;
-        }
-      }
-    }
-  }
-  
-  void updateBlockLevelStats() {
-    /* This iterates over all the blocks and updates various 
-     * counts.
-     * Since iterating over all the blocks at once would be quite 
-     * large operation under lock, we iterate over all the files
-     * and update the counts for blocks that belong to a file.
-     */
-      
-    LOG.info("Starting update of block level stats. " +
-             "This could take a few minutes");
-    BlockLevelStats stats = new BlockLevelStats();
-    updateBlockLevelStats("/", stats);
-    stats.updatedAt = System.currentTimeMillis();
-    
-    LOG.info("Block level stats:\n\t" + stats.statusString("\n\t"));
-    synchronized (this) {
-      latestBlockLevelStats = stats;
-    }
-  }
-}
-
-/**
- * A status report object for Generation Stamp Upgrades
- */
-class GenerationStampUpgradeStatusReport extends UpgradeStatusReport {
-
-  String extraText = "";
-
-  public GenerationStampUpgradeStatusReport() {
-  }
-
-  public GenerationStampUpgradeStatusReport(int version, short status,
-                                            String extraText) {
-    super(version, status, false);
-    this.extraText = extraText;
-  }
-
-  @Override
-  public String getStatusText(boolean details) {
-    return super.getStatusText(details) + "\n\n" + extraText;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    extraText = Text.readString(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    Text.writeString(out, extraText);
-  }
-}
-