|
@@ -0,0 +1,981 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.dfs;
|
|
|
+
|
|
|
+import java.io.*;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+
|
|
|
+import org.apache.commons.logging.*;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.retry.*;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.ipc.RPC;
|
|
|
+
|
|
|
+/**
|
|
|
+ * This class associates a block generation stamp with with block. This
|
|
|
+ * generation stamp is written to each metadata file. Please see
|
|
|
+ * HADOOP-1700 for details.
|
|
|
+ */
|
|
|
+class GenerationStampUpgradeDatanode extends UpgradeObjectDatanode {
|
|
|
+
|
|
|
+ public static final Log LOG =
|
|
|
+ LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgrade");
|
|
|
+
|
|
|
+ DatanodeProtocol namenode;
|
|
|
+ InetSocketAddress namenodeAddr;
|
|
|
+
|
|
|
+ // stats
|
|
|
+ private AtomicInteger blocksPreviouslyUpgraded = new AtomicInteger(0);
|
|
|
+ private AtomicInteger blocksToUpgrade = new AtomicInteger(0);
|
|
|
+ private AtomicInteger blocksUpgraded = new AtomicInteger(0);
|
|
|
+ private AtomicInteger errors = new AtomicInteger(0);
|
|
|
+
|
|
|
+ // process the upgrade using a pool of threads.
|
|
|
+ static private final int poolSize = 4;
|
|
|
+
|
|
|
+ // If no progress has occured during this time, print warnings message.
|
|
|
+ static private final int LONG_TIMEOUT_MILLISEC = 1*60*1000; // 1 minute
|
|
|
+
|
|
|
+ // This object is needed to indicate that namenode is not running upgrade.
|
|
|
+ static UpgradeCommand noUpgradeOnNamenode = new UpgradeCommand();
|
|
|
+
|
|
|
+ private List<UpgradeExecutor> completedList = new LinkedList<UpgradeExecutor>();
|
|
|
+
|
|
|
+ /* This is set when the datanode misses the regular upgrade.
|
|
|
+ * When this is set, it upgrades the block but stops heartbeating
|
|
|
+ * to the namenode.
|
|
|
+ */
|
|
|
+ private AtomicBoolean offlineUpgrade = new AtomicBoolean(false);
|
|
|
+ private AtomicBoolean upgradeCompleted = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ // Implement the common interfaces required by UpgradeObjectDatanode
|
|
|
+
|
|
|
+ public int getVersion() {
|
|
|
+ return GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Start upgrade if it not already running. It sends status to
|
|
|
+ * namenode even if an upgrade is already in progress.
|
|
|
+ */
|
|
|
+ public synchronized UpgradeCommand startUpgrade() throws IOException {
|
|
|
+ if (offlineUpgrade.get()) {
|
|
|
+ doUpgrade();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getDescription() {
|
|
|
+ return "Block Generation Stamp Upgrade at Datanode";
|
|
|
+ }
|
|
|
+
|
|
|
+ public short getUpgradeStatus() {
|
|
|
+ return (blocksToUpgrade.get() == blocksUpgraded.get()) ? 100 :
|
|
|
+ (short) Math.floor(blocksUpgraded.get()*100.0/blocksToUpgrade.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ public UpgradeCommand completeUpgrade() throws IOException {
|
|
|
+ // return latest stats command.
|
|
|
+ assert getUpgradeStatus() == 100;
|
|
|
+ return new DatanodeStatsCommand(getUpgradeStatus(),
|
|
|
+ getDatanode().dnRegistration,
|
|
|
+ blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
|
|
|
+ blocksToUpgrade.get()-blocksUpgraded.get(),
|
|
|
+ errors.get(),
|
|
|
+ GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
|
|
|
+ int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
|
|
|
+ if(nsUpgradeVersion >= getVersion()) {
|
|
|
+ return false; // Normal upgrade.
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("\n This Datanode has missed a cluster wide Block generation Stamp Upgrade." +
|
|
|
+ "\n Will perform an 'offline' upgrade of the blocks." +
|
|
|
+ "\n During this time, Datanode does not heartbeat.");
|
|
|
+
|
|
|
+
|
|
|
+ // Namenode removes this node from the registered nodes
|
|
|
+ try {
|
|
|
+ getDatanode().namenode.errorReport(getDatanode().dnRegistration,
|
|
|
+ DatanodeProtocol.NOTIFY,
|
|
|
+ "Performing an offline generation stamp " +
|
|
|
+ "upgrade. " +
|
|
|
+ "Will be back online once the ugprade " +
|
|
|
+ "completes. Please see datanode logs.");
|
|
|
+
|
|
|
+ } catch(IOException ignored) {
|
|
|
+ LOG.info("\n This Datanode was unable to send error report to namenode.");
|
|
|
+ }
|
|
|
+ offlineUpgrade.set(true);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public GenerationStampUpgradeDatanode() {
|
|
|
+ blocksPreviouslyUpgraded.set(0);
|
|
|
+ blocksToUpgrade.set(0);
|
|
|
+ blocksUpgraded.set(0);
|
|
|
+ errors.set(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ static File getPreGenerationMetaFile(File f) {
|
|
|
+ return new File(f.getAbsolutePath() + FSDataset.METADATA_EXTENSION);
|
|
|
+ }
|
|
|
+
|
|
|
+ // This class is invoked by the worker thread to convert the
|
|
|
+ // metafile into the new format
|
|
|
+ //
|
|
|
+ class UpgradeExecutor implements Runnable {
|
|
|
+ Block block;
|
|
|
+ Throwable throwable;
|
|
|
+
|
|
|
+ UpgradeExecutor(Block b) {
|
|
|
+ block = b;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ // do the real work here
|
|
|
+ FSDataset dataset = (FSDataset) getDatanode().data;
|
|
|
+ upgradeToCurVersion(dataset, block);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ throwable = t;
|
|
|
+ }
|
|
|
+ synchronized (completedList) {
|
|
|
+ completedList.add(this);
|
|
|
+ completedList.notify();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Upgrades the metadata file to current version if required.
|
|
|
+ * @param dataset
|
|
|
+ * @param block
|
|
|
+ */
|
|
|
+ void upgradeToCurVersion(FSDataset dataset, Block block)
|
|
|
+ throws IOException {
|
|
|
+ File blockFile = dataset.getBlockFile(block);
|
|
|
+ if (blockFile == null) {
|
|
|
+ throw new IOException("Could find file for " + block);
|
|
|
+ }
|
|
|
+
|
|
|
+ File metadataFile = dataset.getMetaFile(block);
|
|
|
+ File oldmetadataFile = getPreGenerationMetaFile(blockFile);
|
|
|
+
|
|
|
+ if (metadataFile.exists() && oldmetadataFile.exists()) {
|
|
|
+ //
|
|
|
+ // If both file exists and are of the same size,
|
|
|
+ // then delete the old one. If the sizes are not same then
|
|
|
+ // leave both of them and consider the upgrade as successful.
|
|
|
+ //
|
|
|
+ if (metadataFile.length() == oldmetadataFile.length()) {
|
|
|
+ if (!oldmetadataFile.delete()) {
|
|
|
+ LOG.info("Unable to delete old metadata file " + oldmetadataFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if (metadataFile.exists()) {
|
|
|
+ //
|
|
|
+ // Only the new file exists, nothing more to do.
|
|
|
+ //
|
|
|
+ return;
|
|
|
+ } else if (oldmetadataFile.exists()) {
|
|
|
+ //
|
|
|
+ // The old file exists but the new one is missing. Rename
|
|
|
+ // old one to new name.
|
|
|
+ //
|
|
|
+ if (!oldmetadataFile.renameTo(metadataFile)) {
|
|
|
+ throw new IOException("Could find rename " + oldmetadataFile +
|
|
|
+ " to " + metadataFile);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new IOException("Could find any metadata file for " + block);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // This method iterates through all the blocks on a datanode and
|
|
|
+ // do the upgrade.
|
|
|
+ //
|
|
|
+ void doUpgrade() throws IOException {
|
|
|
+
|
|
|
+ if (upgradeCompleted.get()) {
|
|
|
+ assert offlineUpgrade.get() :
|
|
|
+ ("Multiple calls to doUpgrade is expected only during " +
|
|
|
+ "offline upgrade");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ FSDataset dataset = (FSDataset) getDatanode().data;
|
|
|
+
|
|
|
+ // Set up the retry policy so that each attempt waits for one minute.
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // set rpc timeout to one minute.
|
|
|
+ conf.set("ipc.client.timeout", "60000");
|
|
|
+
|
|
|
+ RetryPolicy timeoutPolicy =
|
|
|
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
|
|
+ LONG_TIMEOUT_MILLISEC/1000,
|
|
|
+ 1, TimeUnit.MILLISECONDS);
|
|
|
+
|
|
|
+ Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
|
|
|
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
+ exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
|
|
|
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
|
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
|
|
+ Map<String,RetryPolicy> methodNameToPolicyMap =
|
|
|
+ new HashMap<String, RetryPolicy>();
|
|
|
+ // do we need to set the policy for connection failures also?
|
|
|
+ methodNameToPolicyMap.put("processUpgradeCommand", methodPolicy);
|
|
|
+
|
|
|
+ LOG.info("Starting Block Generation Stamp Upgrade on datanode " +
|
|
|
+ getDatanode());
|
|
|
+
|
|
|
+ for (;;) {
|
|
|
+ try {
|
|
|
+ namenodeAddr = getDatanode().getNameNodeAddr();
|
|
|
+ namenode = (DatanodeProtocol) RetryProxy.create(
|
|
|
+ DatanodeProtocol.class,
|
|
|
+ RPC.waitForProxy(DatanodeProtocol.class,
|
|
|
+ DatanodeProtocol.versionID,
|
|
|
+ namenodeAddr,
|
|
|
+ conf),
|
|
|
+ methodNameToPolicyMap);
|
|
|
+ break;
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Generation Stamp Upgrade Exception " +
|
|
|
+ "while trying to connect to NameNode at " +
|
|
|
+ getDatanode().getNameNodeAddr().toString() + " : " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ try {
|
|
|
+ Thread.sleep(10*1000);
|
|
|
+ } catch (InterruptedException e1) {
|
|
|
+ throw new IOException("Interrupted Sleep while creating RPC proxy." +
|
|
|
+ e1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Block Generation Stamp Upgrade Datanode connected to " +
|
|
|
+ "namenode at " + namenodeAddr);
|
|
|
+
|
|
|
+ // Get a list of all the blocks :
|
|
|
+ LinkedList<UpgradeExecutor> blockList = new LinkedList<UpgradeExecutor>();
|
|
|
+
|
|
|
+ //Fill blockList with blocks to be upgraded.
|
|
|
+ Block [] blockArr = dataset.getBlockReport();
|
|
|
+
|
|
|
+ for (Block b : blockArr) {
|
|
|
+ File blockFile = null;
|
|
|
+ try {
|
|
|
+ blockFile = dataset.getBlockFile(b);
|
|
|
+ } catch (IOException e) {
|
|
|
+ //The block might just be deleted. ignore it.
|
|
|
+ LOG.warn("Could not find file location for " + b +
|
|
|
+ ". It might already be deleted. Exception : " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ errors.getAndIncrement();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!blockFile.exists()) {
|
|
|
+ errors.getAndIncrement();
|
|
|
+ LOG.error("could not find block file " + blockFile);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ File metaFile = dataset.getMetaFile(b);
|
|
|
+ File oldMetaFile = getPreGenerationMetaFile(blockFile);
|
|
|
+ if (metaFile.exists()) {
|
|
|
+ blocksPreviouslyUpgraded.getAndIncrement();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ blocksToUpgrade.getAndIncrement();
|
|
|
+ blockList.add(new UpgradeExecutor(b));
|
|
|
+ }
|
|
|
+ blockArr = null;
|
|
|
+ int nLeft = blockList.size();
|
|
|
+
|
|
|
+ LOG.info("Starting upgrade of " + blocksToUpgrade.get() + " blocks out of " +
|
|
|
+ (blocksToUpgrade.get() + blocksPreviouslyUpgraded.get()));
|
|
|
+
|
|
|
+ // Start the pool of upgrade workers
|
|
|
+ ExecutorService pool = Executors.newFixedThreadPool(poolSize);
|
|
|
+ for (Iterator<UpgradeExecutor> it = blockList.iterator(); it.hasNext();) {
|
|
|
+ pool.submit(it.next());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Inform the namenode
|
|
|
+ sendStatus();
|
|
|
+
|
|
|
+ // Report status to namenode every so many seconds:
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ long statusReportIntervalMilliSec = 30*1000;
|
|
|
+ long lastStatusReportTime = now;
|
|
|
+ long lastUpdateTime = now;
|
|
|
+ long lastWarnTime = now;
|
|
|
+
|
|
|
+ // Now wait for the tasks to complete.
|
|
|
+ //
|
|
|
+ while (nLeft > 0) {
|
|
|
+ synchronized (completedList) {
|
|
|
+ if (completedList.size() <= 0) {
|
|
|
+ try {
|
|
|
+ completedList.wait(1000);
|
|
|
+ } catch (InterruptedException ignored) {}
|
|
|
+ }
|
|
|
+
|
|
|
+ now = System.currentTimeMillis();
|
|
|
+
|
|
|
+ if (completedList.size()> 0) {
|
|
|
+ UpgradeExecutor exe = completedList.remove(0);
|
|
|
+ nLeft--;
|
|
|
+ if (exe.throwable != null) {
|
|
|
+ errors.getAndIncrement();
|
|
|
+ LOG.error("Got an exception during generation stamp upgrade of " +
|
|
|
+ exe.block + ": " +
|
|
|
+ StringUtils.stringifyException(exe.throwable));
|
|
|
+ }
|
|
|
+ blocksUpgraded.getAndIncrement();
|
|
|
+ lastUpdateTime = now;
|
|
|
+ } else {
|
|
|
+ if ((now - lastUpdateTime) >= LONG_TIMEOUT_MILLISEC &&
|
|
|
+ (now - lastWarnTime) >= LONG_TIMEOUT_MILLISEC) {
|
|
|
+ lastWarnTime = now;
|
|
|
+ LOG.warn("No block was updated in last " +
|
|
|
+ (LONG_TIMEOUT_MILLISEC/(60*1000)) +
|
|
|
+ " minutes! will keep waiting... ");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((now-lastStatusReportTime) > statusReportIntervalMilliSec) {
|
|
|
+ sendStatus();
|
|
|
+ lastStatusReportTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pool.shutdown();
|
|
|
+ upgradeCompleted.set(true);
|
|
|
+
|
|
|
+ LOG.info("Completed Block Generation Stamp Upgrade. Total of " +
|
|
|
+ (blocksPreviouslyUpgraded.get() + blocksToUpgrade.get()) +
|
|
|
+ " blocks : " + blocksPreviouslyUpgraded.get() + " blocks previously " +
|
|
|
+ "upgraded, " + blocksUpgraded.get() + " blocks upgraded this time " +
|
|
|
+ "with " + errors.get() + " errors.");
|
|
|
+
|
|
|
+ // now inform the name node about the completion.
|
|
|
+ // What if there is no upgrade running on Namenode now?
|
|
|
+ while (!sendStatus());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Sends current status and stats to namenode and logs it to local log*/
|
|
|
+ boolean sendStatus() {
|
|
|
+ LOG.info((offlineUpgrade.get() ? "Offline " : "") +
|
|
|
+ "Block Generation Stamp Upgrade : " +
|
|
|
+ getUpgradeStatus() + "% completed.");
|
|
|
+ if (offlineUpgrade.get()) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ DatanodeStatsCommand cmd = null;
|
|
|
+ synchronized (this) {
|
|
|
+ cmd = new DatanodeStatsCommand(getUpgradeStatus(),
|
|
|
+ getDatanode().dnRegistration,
|
|
|
+ blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
|
|
|
+ blocksToUpgrade.get()-blocksUpgraded.get(),
|
|
|
+ errors.get(),
|
|
|
+ GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
|
|
|
+ }
|
|
|
+ UpgradeCommand reply = sendCommand(namenodeAddr, namenode, cmd, 0);
|
|
|
+ if (reply == null) {
|
|
|
+ LOG.warn("Could not send status to Namenode. Namenode might be " +
|
|
|
+ "over loaded or down.");
|
|
|
+ }
|
|
|
+ return reply != null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // Sends a command to the namenode
|
|
|
+ static UpgradeCommand sendCommand(InetSocketAddress namenodeAddr,
|
|
|
+ DatanodeProtocol namenode,
|
|
|
+ UpgradeCommand cmd, int retries) {
|
|
|
+ for(int i=0; i<=retries || retries<0; i++) {
|
|
|
+ try {
|
|
|
+ UpgradeCommand reply = namenode.processUpgradeCommand(cmd);
|
|
|
+ if (reply == null) {
|
|
|
+ /* namenode might not be running upgrade or finished
|
|
|
+ * an upgrade. We just return a static object */
|
|
|
+ return noUpgradeOnNamenode;
|
|
|
+ }
|
|
|
+ return reply;
|
|
|
+ } catch (IOException e) {
|
|
|
+ // print the stack trace only for the last retry.
|
|
|
+ LOG.warn("Exception to " + namenodeAddr +
|
|
|
+ " while sending command " +
|
|
|
+ cmd.getAction() + ": " + e +
|
|
|
+ ((retries<0 || i>=retries)? "... will retry ..." :
|
|
|
+ ": " + StringUtils.stringifyException(e)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Once an upgrade starts at the namenode , this class manages the upgrade
|
|
|
+ * process.
|
|
|
+ */
|
|
|
+class GenerationStampUpgradeNamenode extends UpgradeObjectNamenode {
|
|
|
+
|
|
|
+ public static final Log LOG =
|
|
|
+ LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgradeNamenode");
|
|
|
+
|
|
|
+ static final long inactivityExtension = 10*1000; // 10 seconds
|
|
|
+ AtomicLong lastNodeCompletionTime = new AtomicLong(0);
|
|
|
+
|
|
|
+ // The layout version before the generation stamp upgrade.
|
|
|
+ static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
|
|
|
+
|
|
|
+ static final int DN_CMD_STATS = 300;
|
|
|
+
|
|
|
+ enum UpgradeStatus {
|
|
|
+ INITIALIZED,
|
|
|
+ STARTED,
|
|
|
+ DATANODES_DONE,
|
|
|
+ COMPLETED,
|
|
|
+ }
|
|
|
+
|
|
|
+ UpgradeStatus upgradeStatus = UpgradeStatus.INITIALIZED;
|
|
|
+
|
|
|
+ class DnInfo {
|
|
|
+ short percentCompleted = 0;
|
|
|
+ long blocksUpgraded = 0;
|
|
|
+ long blocksRemaining = 0;
|
|
|
+ long errors = 0;
|
|
|
+
|
|
|
+ DnInfo(short pcCompleted) {
|
|
|
+ percentCompleted = status;
|
|
|
+ }
|
|
|
+ DnInfo() {}
|
|
|
+
|
|
|
+ void setStats(DatanodeStatsCommand cmd) {
|
|
|
+ percentCompleted = cmd.getCurrentStatus();
|
|
|
+ blocksUpgraded = cmd.blocksUpgraded;
|
|
|
+ blocksRemaining = cmd.blocksRemaining;
|
|
|
+ errors = cmd.errors;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isDone() {
|
|
|
+ return percentCompleted >= 100;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* We should track only the storageIDs and not DatanodeID, which
|
|
|
+ * includes datanode name and storage id.
|
|
|
+ */
|
|
|
+ HashMap<DatanodeID, DnInfo> dnMap = new HashMap<DatanodeID, DnInfo>();
|
|
|
+ HashMap<DatanodeID, DnInfo> unfinishedDnMap =
|
|
|
+ new HashMap<DatanodeID, DnInfo>();
|
|
|
+
|
|
|
+ Daemon monitorThread;
|
|
|
+ double avgDatanodeCompletionPct = 0;
|
|
|
+ boolean forceDnCompletion = false;
|
|
|
+
|
|
|
+ //Upgrade object interface:
|
|
|
+
|
|
|
+ public int getVersion() {
|
|
|
+ return PRE_GENERATIONSTAMP_LAYOUT_VERSION;
|
|
|
+ }
|
|
|
+
|
|
|
+ public UpgradeCommand completeUpgrade() throws IOException {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getDescription() {
|
|
|
+ return "Block Generation Stamp Upgrade at Namenode";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized short getUpgradeStatus() {
|
|
|
+ // Reserve 10% for deleting files.
|
|
|
+ if (upgradeStatus == UpgradeStatus.COMPLETED) {
|
|
|
+ return 100;
|
|
|
+ }
|
|
|
+ return (short) avgDatanodeCompletionPct;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public UpgradeCommand startUpgrade() throws IOException {
|
|
|
+
|
|
|
+ assert monitorThread == null;
|
|
|
+ lastNodeCompletionTime.set(System.currentTimeMillis());
|
|
|
+
|
|
|
+ monitorThread = new Daemon(new UpgradeMonitor());
|
|
|
+ monitorThread.start();
|
|
|
+ return super.startUpgrade();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void forceProceed() throws IOException {
|
|
|
+ if (forceDnCompletion) {
|
|
|
+ LOG.warn("forceProceed is already set for this upgrade. It can take " +
|
|
|
+ "a short while to take affect. Please wait.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("got forceProceed request for this upgrade. Datanodes upgrade " +
|
|
|
+ "will be considered done. It can take a few seconds to take " +
|
|
|
+ "effect.");
|
|
|
+ forceDnCompletion = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ UpgradeCommand processUpgradeCommand(UpgradeCommand command)
|
|
|
+ throws IOException {
|
|
|
+ switch (command.getAction()) {
|
|
|
+
|
|
|
+ case GenerationStampUpgradeNamenode.DN_CMD_STATS :
|
|
|
+ return handleStatsCmd(command);
|
|
|
+
|
|
|
+ default:
|
|
|
+ throw new IOException("Unknown Command for Generation Stamp Upgrade : " +
|
|
|
+ command.getAction());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public UpgradeStatusReport getUpgradeStatusReport(boolean details)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ /* If 'details' is true should we update block level status?
|
|
|
+ * It could take multiple minutes
|
|
|
+ * updateBlckLevelStats()?
|
|
|
+ */
|
|
|
+
|
|
|
+ String replyString = "";
|
|
|
+
|
|
|
+ short status = 0;
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+
|
|
|
+ status = getUpgradeStatus();
|
|
|
+
|
|
|
+ replyString = String.format(
|
|
|
+ ((monitorThread == null) ? "\tUpgrade has not been started yet.\n" : "")+
|
|
|
+ ((forceDnCompletion) ? "\tForce Proceed is ON\n" : "") +
|
|
|
+ "\tLast Block Level Stats updated at : %tc\n" +
|
|
|
+ "\tLast Block Level Stats : %s\n" +
|
|
|
+ "\tBrief Datanode Status : %s\n" +
|
|
|
+ "%s",
|
|
|
+ latestBlockLevelStats.updatedAt,
|
|
|
+ latestBlockLevelStats.statusString("\n\t "),
|
|
|
+ printStatus("\n\t "),
|
|
|
+ ((status < 100 && upgradeStatus == UpgradeStatus.DATANODES_DONE) ?
|
|
|
+ "\tNOTE: Upgrade at the Datanodes has finished. Deleteing \".crc\" " +
|
|
|
+ "files\n\tcan take longer than status implies.\n" : "")
|
|
|
+ );
|
|
|
+
|
|
|
+ if (details) {
|
|
|
+ // list all the known data nodes
|
|
|
+ StringBuilder str = null;
|
|
|
+ Iterator<DatanodeID> keys = dnMap.keySet().iterator();
|
|
|
+ Iterator<DnInfo> values = dnMap.values().iterator();
|
|
|
+
|
|
|
+ for(; keys.hasNext() && values.hasNext() ;) {
|
|
|
+ DatanodeID dn = keys.next();
|
|
|
+ DnInfo info = values.next();
|
|
|
+ String dnStr = "\t\t" + dn.getName() + "\t : " +
|
|
|
+ info.percentCompleted + " % \t" +
|
|
|
+ info.blocksUpgraded + " u \t" +
|
|
|
+ info.blocksRemaining + " r \t" +
|
|
|
+ info.errors + " e\n";
|
|
|
+ if ( str == null ) {
|
|
|
+ str = new StringBuilder(dnStr.length()*
|
|
|
+ (dnMap.size() + (dnMap.size()+7)/8));
|
|
|
+ }
|
|
|
+ str.append(dnStr);
|
|
|
+ }
|
|
|
+
|
|
|
+ replyString += "\n\tDatanode Stats (total: " + dnMap.size() + "): " +
|
|
|
+ "pct Completion(%) blocks upgraded (u) " +
|
|
|
+ "blocks remaining (r) errors (e)\n\n" +
|
|
|
+ (( str == null ) ?
|
|
|
+ "\t\tThere are no known Datanodes\n" : str);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return new GenerationStampUpgradeStatusReport(
|
|
|
+ PRE_GENERATIONSTAMP_LAYOUT_VERSION,
|
|
|
+ status, replyString);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The namenode process a periodic statistics message from the datanode.
|
|
|
+ */
|
|
|
+ private synchronized UpgradeCommand handleStatsCmd(UpgradeCommand cmd) {
|
|
|
+
|
|
|
+ DatanodeStatsCommand stats = (DatanodeStatsCommand)cmd;
|
|
|
+
|
|
|
+ DatanodeID dn = stats.datanodeId;
|
|
|
+ DnInfo dnInfo = dnMap.get(dn);
|
|
|
+ boolean alreadyCompleted = (dnInfo != null && dnInfo.isDone());
|
|
|
+
|
|
|
+ if (dnInfo == null) {
|
|
|
+ dnInfo = new DnInfo();
|
|
|
+ dnMap.put(dn, dnInfo);
|
|
|
+ LOG.info("Upgrade started/resumed at datanode " + dn.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ dnInfo.setStats(stats);
|
|
|
+ if (!dnInfo.isDone()) {
|
|
|
+ unfinishedDnMap.put(dn, dnInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (dnInfo.isDone() && !alreadyCompleted) {
|
|
|
+ LOG.info("upgrade completed on datanode " + dn.getName());
|
|
|
+ unfinishedDnMap.remove(dn);
|
|
|
+ if (unfinishedDnMap.size() == 0) {
|
|
|
+ lastNodeCompletionTime.set(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //Should we send any more info?
|
|
|
+ return new UpgradeCommand();
|
|
|
+ }
|
|
|
+
|
|
|
+ public GenerationStampUpgradeNamenode() {
|
|
|
+ }
|
|
|
+
|
|
|
+ // For now we will wait for all the nodes to complete upgrade.
|
|
|
+ synchronized boolean isUpgradeDone() {
|
|
|
+ return upgradeStatus == UpgradeStatus.COMPLETED;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized String printStatus(String spacing) {
|
|
|
+ //NOTE: iterates on all the datanodes.
|
|
|
+
|
|
|
+ // Calculate % completion on all the data nodes.
|
|
|
+ long errors = 0;
|
|
|
+ long totalCompletion = 0;
|
|
|
+ for( Iterator<DnInfo> it = dnMap.values().iterator(); it.hasNext(); ) {
|
|
|
+ DnInfo dnInfo = it.next();
|
|
|
+ totalCompletion += dnInfo.percentCompleted;
|
|
|
+ errors += dnInfo.errors;
|
|
|
+ }
|
|
|
+
|
|
|
+ avgDatanodeCompletionPct = totalCompletion/(dnMap.size() + 1e-20);
|
|
|
+
|
|
|
+ String msg = "Avg completion of all Datanodes: " +
|
|
|
+ String.format("%.2f%%", avgDatanodeCompletionPct) +
|
|
|
+ " with " + errors + " errors. " +
|
|
|
+ ((unfinishedDnMap.size() > 0) ? spacing +
|
|
|
+ unfinishedDnMap.size() + " out of " + dnMap.size() +
|
|
|
+ " nodes are not done." : "");
|
|
|
+
|
|
|
+ LOG.info("Generation Stamp Upgrade is " + (isUpgradeDone() ?
|
|
|
+ "complete. " : "still running. ") + spacing + msg);
|
|
|
+ return msg;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void setStatus(UpgradeStatus status) {
|
|
|
+ upgradeStatus = status;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Checks if upgrade completed based on datanode's status and/or
|
|
|
+ * if all the blocks are upgraded.
|
|
|
+ */
|
|
|
+ private synchronized UpgradeStatus checkOverallCompletion() {
|
|
|
+
|
|
|
+ if (upgradeStatus == UpgradeStatus.COMPLETED ||
|
|
|
+ upgradeStatus == UpgradeStatus.DATANODES_DONE) {
|
|
|
+ return upgradeStatus;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (upgradeStatus != UpgradeStatus.DATANODES_DONE) {
|
|
|
+ boolean datanodesDone =
|
|
|
+ (dnMap.size() > 0 && unfinishedDnMap.size() == 0 &&
|
|
|
+ ( System.currentTimeMillis() - lastNodeCompletionTime.get() ) >
|
|
|
+ inactivityExtension) || forceDnCompletion ;
|
|
|
+
|
|
|
+ if ( datanodesDone ) {
|
|
|
+ LOG.info("Upgrade of DataNode blocks is complete. " +
|
|
|
+ ((forceDnCompletion) ? "(ForceDnCompletion is on.)" : ""));
|
|
|
+ upgradeStatus = UpgradeStatus.DATANODES_DONE;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (upgradeStatus != UpgradeStatus.DATANODES_DONE &&
|
|
|
+ latestBlockLevelStats.updatedAt > 0) {
|
|
|
+ // check if last block report marked all
|
|
|
+ if (latestBlockLevelStats.minimallyReplicatedBlocks == 0 &&
|
|
|
+ latestBlockLevelStats.underReplicatedBlocks == 0) {
|
|
|
+
|
|
|
+ LOG.info("Marking datanode upgrade complete since all the blocks are " +
|
|
|
+ "upgraded (even though some datanodes may not have " +
|
|
|
+ "reported completion. Block level stats :\n\t" +
|
|
|
+ latestBlockLevelStats.statusString("\n\t"));
|
|
|
+ upgradeStatus = UpgradeStatus.DATANODES_DONE;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return upgradeStatus;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class monitors the upgrade progress and periodically prints
|
|
|
+ * status message to log.
|
|
|
+ */
|
|
|
+ class UpgradeMonitor implements Runnable {
|
|
|
+
|
|
|
+ static final long statusReportIntervalMillis = 1*60*1000;
|
|
|
+ static final long blockReportIntervalMillis = 5*60*1000;
|
|
|
+ static final int sleepTimeSec = 5;
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ long lastReportTime = System.currentTimeMillis();
|
|
|
+ long lastBlockReportTime = lastReportTime;
|
|
|
+
|
|
|
+ while ( !isUpgradeDone() ) {
|
|
|
+ UpgradeStatus status = checkOverallCompletion();
|
|
|
+
|
|
|
+ if (status == UpgradeStatus.DATANODES_DONE) {
|
|
|
+ setStatus(UpgradeStatus.COMPLETED);
|
|
|
+ }
|
|
|
+
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+
|
|
|
+
|
|
|
+ if (now-lastBlockReportTime >= blockReportIntervalMillis) {
|
|
|
+ updateBlockLevelStats();
|
|
|
+ // Check if all the blocks have been upgraded.
|
|
|
+ lastBlockReportTime = now;
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((now - lastReportTime) >= statusReportIntervalMillis ||
|
|
|
+ isUpgradeDone()) {
|
|
|
+ printStatus("\n\t");
|
|
|
+ lastReportTime = now;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isUpgradeDone()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ Thread.sleep(sleepTimeSec*1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Leaving the Generation Stamp Upgrade Namenode monitor thread");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private BlockLevelStats latestBlockLevelStats = new BlockLevelStats();
|
|
|
+ // internal class to hold the stats.
|
|
|
+ private static class BlockLevelStats {
|
|
|
+ long fullyReplicatedBlocks = 0;
|
|
|
+ long minimallyReplicatedBlocks = 0;
|
|
|
+ long underReplicatedBlocks = 0; // includes unReplicatedBlocks
|
|
|
+ long unReplicatedBlocks = 0; // zero replicas upgraded
|
|
|
+ long errors;
|
|
|
+ long updatedAt;
|
|
|
+
|
|
|
+ String statusString(String spacing) {
|
|
|
+ long totalBlocks = fullyReplicatedBlocks +
|
|
|
+ minimallyReplicatedBlocks +
|
|
|
+ underReplicatedBlocks;
|
|
|
+ double multiplier = 100/(totalBlocks + 1e-20);
|
|
|
+
|
|
|
+ if (spacing.equals("")) {
|
|
|
+ spacing = ", ";
|
|
|
+ }
|
|
|
+
|
|
|
+ return String.format(
|
|
|
+ "Total Blocks : %d" +
|
|
|
+ "%sFully Upgragraded : %.2f%%" +
|
|
|
+ "%sMinimally Upgraded : %.2f%%" +
|
|
|
+ "%sUnder Upgraded : %.2f%% (includes Un-upgraded blocks)" +
|
|
|
+ "%sUn-upgraded : %.2f%%" +
|
|
|
+ "%sErrors : %d", totalBlocks,
|
|
|
+ spacing, (fullyReplicatedBlocks * multiplier),
|
|
|
+ spacing, (minimallyReplicatedBlocks * multiplier),
|
|
|
+ spacing, (underReplicatedBlocks * multiplier),
|
|
|
+ spacing, (unReplicatedBlocks * multiplier),
|
|
|
+ spacing, errors);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void updateBlockLevelStats(String path, BlockLevelStats stats) {
|
|
|
+ DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
|
|
|
+
|
|
|
+ for (DFSFileInfo file:fileArr) {
|
|
|
+ if (file.isDir()) {
|
|
|
+ updateBlockLevelStats(file.getPath().toString(), stats);
|
|
|
+ } else {
|
|
|
+ // Get the all the blocks.
|
|
|
+ LocatedBlocks blockLoc = null;
|
|
|
+ try {
|
|
|
+ blockLoc = getFSNamesystem().getBlockLocations(
|
|
|
+ file.getPath().toString(), 0, file.getLen());
|
|
|
+ int numBlocks = blockLoc.locatedBlockCount();
|
|
|
+ for (int i=0; i<numBlocks; i++) {
|
|
|
+ LocatedBlock loc = blockLoc.get(i);
|
|
|
+ DatanodeInfo[] dnArr = loc.getLocations();
|
|
|
+ int numUpgraded = 0;
|
|
|
+ synchronized (this) {
|
|
|
+ for (DatanodeInfo dn:dnArr) {
|
|
|
+ DnInfo dnInfo = dnMap.get(dn);
|
|
|
+ if (dnInfo != null && dnInfo.isDone()) {
|
|
|
+ numUpgraded++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (numUpgraded >= file.getReplication()) {
|
|
|
+ stats.fullyReplicatedBlocks++;
|
|
|
+ } else if (numUpgraded >= getFSNamesystem().getMinReplication()) {
|
|
|
+ stats.minimallyReplicatedBlocks++;
|
|
|
+ } else {
|
|
|
+ stats.underReplicatedBlocks++;
|
|
|
+ }
|
|
|
+ if (numUpgraded == 0) {
|
|
|
+ stats.unReplicatedBlocks++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("BlockGenerationStampUpgrade: could not get block locations for " +
|
|
|
+ file.getPath().toString() + " : " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ stats.errors++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void updateBlockLevelStats() {
|
|
|
+ /* This iterates over all the blocks and updates various
|
|
|
+ * counts.
|
|
|
+ * Since iterating over all the blocks at once would be quite
|
|
|
+ * large operation under lock, we iterate over all the files
|
|
|
+ * and update the counts for blocks that belong to a file.
|
|
|
+ */
|
|
|
+
|
|
|
+ LOG.info("Starting update of block level stats. " +
|
|
|
+ "This could take a few minutes");
|
|
|
+ BlockLevelStats stats = new BlockLevelStats();
|
|
|
+ updateBlockLevelStats("/", stats);
|
|
|
+ stats.updatedAt = System.currentTimeMillis();
|
|
|
+
|
|
|
+ LOG.info("Block level stats:\n\t" + stats.statusString("\n\t"));
|
|
|
+ synchronized (this) {
|
|
|
+ latestBlockLevelStats = stats;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * The Datanode sends this statistics object to the Namenode periodically.
|
|
|
+ */
|
|
|
+class DatanodeStatsCommand extends UpgradeCommand {
|
|
|
+ DatanodeID datanodeId;
|
|
|
+ int blocksUpgraded;
|
|
|
+ int blocksRemaining;
|
|
|
+ int errors;
|
|
|
+
|
|
|
+ DatanodeStatsCommand() {
|
|
|
+ super(GenerationStampUpgradeNamenode.DN_CMD_STATS, 0, (short)0);
|
|
|
+ datanodeId = new DatanodeID();
|
|
|
+ }
|
|
|
+
|
|
|
+ public DatanodeStatsCommand(short status, DatanodeID dn,
|
|
|
+ int blocksUpgraded, int blocksRemaining,
|
|
|
+ int errors, int version) {
|
|
|
+ super(GenerationStampUpgradeNamenode.DN_CMD_STATS, version, status);
|
|
|
+ //copy so that only ID part gets serialized
|
|
|
+ datanodeId = new DatanodeID(dn);
|
|
|
+ this.blocksUpgraded = blocksUpgraded;
|
|
|
+ this.blocksRemaining = blocksRemaining;
|
|
|
+ this.errors = errors;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFields(DataInput in) throws IOException {
|
|
|
+ super.readFields(in);
|
|
|
+ datanodeId.readFields(in);
|
|
|
+ blocksUpgraded = in.readInt();
|
|
|
+ blocksRemaining = in.readInt();
|
|
|
+ errors = in.readInt();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(DataOutput out) throws IOException {
|
|
|
+ super.write(out);
|
|
|
+ datanodeId.write(out);
|
|
|
+ out.writeInt(blocksUpgraded);
|
|
|
+ out.writeInt(blocksRemaining);
|
|
|
+ out.writeInt(errors);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * A status report object for Generation Stamp Upgrades
|
|
|
+ */
|
|
|
+class GenerationStampUpgradeStatusReport extends UpgradeStatusReport {
|
|
|
+
|
|
|
+ String extraText = "";
|
|
|
+
|
|
|
+ public GenerationStampUpgradeStatusReport() {
|
|
|
+ }
|
|
|
+
|
|
|
+ public GenerationStampUpgradeStatusReport(int version, short status,
|
|
|
+ String extraText) {
|
|
|
+ super(version, status, false);
|
|
|
+ this.extraText = extraText;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getStatusText(boolean details) {
|
|
|
+ return super.getStatusText(details) + "\n\n" + extraText;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFields(DataInput in) throws IOException {
|
|
|
+ super.readFields(in);
|
|
|
+ extraText = Text.readString(in);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(DataOutput out) throws IOException {
|
|
|
+ super.write(out);
|
|
|
+ Text.writeString(out, extraText);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|