|
@@ -1,1630 +0,0 @@
|
|
|
-/**
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
- * distributed with this work for additional information
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
- *
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
- * limitations under the License.
|
|
|
- */
|
|
|
-
|
|
|
-package org.apache.hadoop.raid;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.FileNotFoundException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.LinkedList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Random;
|
|
|
-import java.util.Set;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.regex.Matcher;
|
|
|
-import java.util.regex.Pattern;
|
|
|
-import java.lang.Thread;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.net.URI;
|
|
|
-import java.lang.reflect.Constructor;
|
|
|
-import java.lang.reflect.InvocationTargetException;
|
|
|
-
|
|
|
-import org.xml.sax.SAXException;
|
|
|
-import javax.xml.parsers.ParserConfigurationException;
|
|
|
-
|
|
|
-import org.apache.hadoop.ipc.*;
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.tools.HadoopArchives;
|
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
-import org.apache.hadoop.util.Daemon;
|
|
|
-import org.apache.hadoop.util.ToolRunner;
|
|
|
-import org.apache.hadoop.util.Time;
|
|
|
-import org.apache.hadoop.net.NetUtils;
|
|
|
-import org.apache.hadoop.fs.HarFileSystem;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.BlockLocation;
|
|
|
-import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
-import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
-import org.apache.hadoop.util.Progressable;
|
|
|
-
|
|
|
-import org.apache.hadoop.raid.protocol.PolicyInfo;
|
|
|
-import org.apache.hadoop.raid.protocol.PolicyList;
|
|
|
-import org.apache.hadoop.raid.protocol.RaidProtocol;
|
|
|
-import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
|
|
|
-
|
|
|
-/**
|
|
|
- * A base class that implements {@link RaidProtocol}.
|
|
|
- *
|
|
|
- * use raid.classname to specify which implementation to use
|
|
|
- */
|
|
|
-public abstract class RaidNode implements RaidProtocol {
|
|
|
-
|
|
|
- static{
|
|
|
- Configuration.addDefaultResource("hdfs-default.xml");
|
|
|
- Configuration.addDefaultResource("hdfs-site.xml");
|
|
|
- Configuration.addDefaultResource("mapred-default.xml");
|
|
|
- Configuration.addDefaultResource("mapred-site.xml");
|
|
|
- }
|
|
|
- public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
|
|
|
- public static final long SLEEP_TIME = 10000L; // 10 seconds
|
|
|
- public static final String TRIGGER_MONITOR_SLEEP_TIME_KEY =
|
|
|
- "hdfs.raid.trigger.monitor.sleep.time";
|
|
|
- public static final int DEFAULT_PORT = 60000;
|
|
|
- // Default stripe length = 5, parity length for RS code = 3
|
|
|
- public static final int DEFAULT_STRIPE_LENGTH = 5;
|
|
|
- public static final int RS_PARITY_LENGTH_DEFAULT = 3;
|
|
|
-
|
|
|
- public static final String RS_PARITY_LENGTH_KEY = "hdfs.raidrs.paritylength";
|
|
|
- public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength";
|
|
|
-
|
|
|
- public static final String DEFAULT_RAID_LOCATION = "/raid";
|
|
|
- public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
|
|
|
- public static final String DEFAULT_RAID_TMP_LOCATION = "/tmp/raid";
|
|
|
- public static final String RAID_TMP_LOCATION_KEY = "fs.raid.tmpdir";
|
|
|
- public static final String DEFAULT_RAID_HAR_TMP_LOCATION = "/tmp/raid_har";
|
|
|
- public static final String RAID_HAR_TMP_LOCATION_KEY = "fs.raid.hartmpdir";
|
|
|
-
|
|
|
- public static final String DEFAULT_RAIDRS_LOCATION = "/raidrs";
|
|
|
- public static final String RAIDRS_LOCATION_KEY = "hdfs.raidrs.locations";
|
|
|
- public static final String DEFAULT_RAIDRS_TMP_LOCATION = "/tmp/raidrs";
|
|
|
- public static final String RAIDRS_TMP_LOCATION_KEY = "fs.raidrs.tmpdir";
|
|
|
- public static final String DEFAULT_RAIDRS_HAR_TMP_LOCATION = "/tmp/raidrs_har";
|
|
|
- public static final String RAIDRS_HAR_TMP_LOCATION_KEY = "fs.raidrs.hartmpdir";
|
|
|
-
|
|
|
- public static final String HAR_SUFFIX = "_raid.har";
|
|
|
- public static final Pattern PARITY_HAR_PARTFILE_PATTERN =
|
|
|
- Pattern.compile(".*" + HAR_SUFFIX + "/part-.*");
|
|
|
-
|
|
|
- public static final String RAIDNODE_CLASSNAME_KEY = "raid.classname";
|
|
|
-
|
|
|
- /** RPC server */
|
|
|
- private Server server;
|
|
|
- /** RPC server address */
|
|
|
- private InetSocketAddress serverAddress = null;
|
|
|
- /** only used for testing purposes */
|
|
|
- protected boolean stopRequested = false;
|
|
|
-
|
|
|
- /** Configuration Manager */
|
|
|
- private ConfigManager configMgr;
|
|
|
-
|
|
|
- /** hadoop configuration */
|
|
|
- protected Configuration conf;
|
|
|
-
|
|
|
- protected boolean initialized; // Are we initialized?
|
|
|
- protected volatile boolean running; // Are we running?
|
|
|
-
|
|
|
- /** Deamon thread to trigger policies */
|
|
|
- Daemon triggerThread = null;
|
|
|
- public static long triggerMonitorSleepTime = SLEEP_TIME;
|
|
|
-
|
|
|
- /** Deamon thread to delete obsolete parity files */
|
|
|
- PurgeMonitor purgeMonitor = null;
|
|
|
- Daemon purgeThread = null;
|
|
|
-
|
|
|
- /** Deamon thread to har raid directories */
|
|
|
- Daemon harThread = null;
|
|
|
-
|
|
|
- /** Daemon thread to fix corrupt files */
|
|
|
- BlockFixer blockFixer = null;
|
|
|
- Daemon blockFixerThread = null;
|
|
|
-
|
|
|
- // statistics about RAW hdfs blocks. This counts all replicas of a block.
|
|
|
- public static class Statistics {
|
|
|
- long numProcessedBlocks; // total blocks encountered in namespace
|
|
|
- long processedSize; // disk space occupied by all blocks
|
|
|
- long remainingSize; // total disk space post RAID
|
|
|
-
|
|
|
- long numMetaBlocks; // total blocks in metafile
|
|
|
- long metaSize; // total disk space for meta files
|
|
|
-
|
|
|
- public void clear() {
|
|
|
- numProcessedBlocks = 0;
|
|
|
- processedSize = 0;
|
|
|
- remainingSize = 0;
|
|
|
- numMetaBlocks = 0;
|
|
|
- metaSize = 0;
|
|
|
- }
|
|
|
- public String toString() {
|
|
|
- long save = processedSize - (remainingSize + metaSize);
|
|
|
- long savep = 0;
|
|
|
- if (processedSize > 0) {
|
|
|
- savep = (save * 100)/processedSize;
|
|
|
- }
|
|
|
- String msg = " numProcessedBlocks = " + numProcessedBlocks +
|
|
|
- " processedSize = " + processedSize +
|
|
|
- " postRaidSize = " + remainingSize +
|
|
|
- " numMetaBlocks = " + numMetaBlocks +
|
|
|
- " metaSize = " + metaSize +
|
|
|
- " %save in raw disk space = " + savep;
|
|
|
- return msg;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Startup options
|
|
|
- static public enum StartupOption{
|
|
|
- TEST ("-test"),
|
|
|
- REGULAR ("-regular");
|
|
|
-
|
|
|
- private String name = null;
|
|
|
- private StartupOption(String arg) {this.name = arg;}
|
|
|
- public String getName() {return name;}
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Start RaidNode.
|
|
|
- * <p>
|
|
|
- * The raid-node can be started with one of the following startup options:
|
|
|
- * <ul>
|
|
|
- * <li>{@link StartupOption#REGULAR REGULAR} - normal raid node startup</li>
|
|
|
- * </ul>
|
|
|
- * The option is passed via configuration field:
|
|
|
- * <tt>fs.raidnode.startup</tt>
|
|
|
- *
|
|
|
- * The conf will be modified to reflect the actual ports on which
|
|
|
- * the RaidNode is up and running if the user passes the port as
|
|
|
- * <code>zero</code> in the conf.
|
|
|
- *
|
|
|
- * @param conf confirguration
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
-
|
|
|
- RaidNode(Configuration conf) throws IOException {
|
|
|
- try {
|
|
|
- initialize(conf);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error(StringUtils.stringifyException(e));
|
|
|
- this.stop();
|
|
|
- throw e;
|
|
|
- } catch (Exception e) {
|
|
|
- this.stop();
|
|
|
- throw new IOException(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public long getProtocolVersion(String protocol,
|
|
|
- long clientVersion) throws IOException {
|
|
|
- if (protocol.equals(RaidProtocol.class.getName())) {
|
|
|
- return RaidProtocol.versionID;
|
|
|
- } else {
|
|
|
- throw new IOException("Unknown protocol to name node: " + protocol);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public ProtocolSignature getProtocolSignature(String protocol,
|
|
|
- long clientVersion, int clientMethodsHash) throws IOException {
|
|
|
- return ProtocolSignature.getProtocolSignature(
|
|
|
- this, protocol, clientVersion, clientMethodsHash);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Wait for service to finish.
|
|
|
- * (Normally, it runs forever.)
|
|
|
- */
|
|
|
- public void join() {
|
|
|
- try {
|
|
|
- if (server != null) server.join();
|
|
|
- if (triggerThread != null) triggerThread.join();
|
|
|
- if (blockFixerThread != null) blockFixerThread.join();
|
|
|
- if (purgeThread != null) purgeThread.join();
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- // do nothing
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Stop all RaidNode threads and wait for all to finish.
|
|
|
- */
|
|
|
- public void stop() {
|
|
|
- if (stopRequested) {
|
|
|
- return;
|
|
|
- }
|
|
|
- stopRequested = true;
|
|
|
- running = false;
|
|
|
- if (server != null) server.stop();
|
|
|
- if (triggerThread != null) triggerThread.interrupt();
|
|
|
- if (blockFixer != null) blockFixer.running = false;
|
|
|
- if (blockFixerThread != null) blockFixerThread.interrupt();
|
|
|
- if (purgeThread != null) purgeThread.interrupt();
|
|
|
- }
|
|
|
-
|
|
|
- private static InetSocketAddress getAddress(String address) {
|
|
|
- return NetUtils.createSocketAddr(address);
|
|
|
- }
|
|
|
-
|
|
|
- public static InetSocketAddress getAddress(Configuration conf) {
|
|
|
- String nodeport = conf.get("raid.server.address");
|
|
|
- if (nodeport == null) {
|
|
|
- nodeport = "localhost:" + DEFAULT_PORT;
|
|
|
- }
|
|
|
- return getAddress(nodeport);
|
|
|
- }
|
|
|
-
|
|
|
- public InetSocketAddress getListenerAddress() {
|
|
|
- return server.getListenerAddress();
|
|
|
- }
|
|
|
-
|
|
|
- private void initialize(Configuration conf)
|
|
|
- throws IOException, SAXException, InterruptedException, RaidConfigurationException,
|
|
|
- ClassNotFoundException, ParserConfigurationException {
|
|
|
- this.conf = conf;
|
|
|
- InetSocketAddress socAddr = RaidNode.getAddress(conf);
|
|
|
- int handlerCount = conf.getInt("fs.raidnode.handler.count", 10);
|
|
|
-
|
|
|
- // read in the configuration
|
|
|
- configMgr = new ConfigManager(conf);
|
|
|
-
|
|
|
- // create rpc server
|
|
|
- this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
|
|
|
- handlerCount, false, conf);
|
|
|
-
|
|
|
- // The rpc-server port can be ephemeral... ensure we have the correct info
|
|
|
- this.serverAddress = this.server.getListenerAddress();
|
|
|
- LOG.info("RaidNode up at: " + this.serverAddress);
|
|
|
-
|
|
|
- initialized = true;
|
|
|
- running = true;
|
|
|
- this.server.start(); // start RPC server
|
|
|
-
|
|
|
- this.blockFixer = BlockFixer.createBlockFixer(conf);
|
|
|
- this.blockFixerThread = new Daemon(this.blockFixer);
|
|
|
- this.blockFixerThread.start();
|
|
|
- // start the deamon thread to fire polcies appropriately
|
|
|
- RaidNode.triggerMonitorSleepTime = conf.getLong(
|
|
|
- TRIGGER_MONITOR_SLEEP_TIME_KEY,
|
|
|
- SLEEP_TIME);
|
|
|
-
|
|
|
- // start the deamon thread to fire polcies appropriately
|
|
|
- this.triggerThread = new Daemon(new TriggerMonitor());
|
|
|
- this.triggerThread.start();
|
|
|
-
|
|
|
- // start the thread that deletes obsolete parity files
|
|
|
- this.purgeMonitor = new PurgeMonitor();
|
|
|
- this.purgeThread = new Daemon(purgeMonitor);
|
|
|
- this.purgeThread.start();
|
|
|
-
|
|
|
- // start the thread that creates HAR files
|
|
|
- this.harThread = new Daemon(new HarMonitor());
|
|
|
- this.harThread.start();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Implement RaidProtocol methods
|
|
|
- */
|
|
|
-
|
|
|
- /** {@inheritDoc} */
|
|
|
- public PolicyList[] getAllPolicies() throws IOException {
|
|
|
- Collection<PolicyList> list = configMgr.getAllPolicies();
|
|
|
- return list.toArray(new PolicyList[list.size()]);
|
|
|
- }
|
|
|
-
|
|
|
- /** {@inheritDoc} */
|
|
|
- public String recoverFile(String inStr, long corruptOffset) throws IOException {
|
|
|
-
|
|
|
- LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
|
|
|
- Path inputPath = new Path(inStr);
|
|
|
- Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
|
|
|
- // find stripe length from config
|
|
|
- int stripeLength = getStripeLength(conf);
|
|
|
-
|
|
|
- // first try decode using XOR code
|
|
|
- Path destPref = xorDestinationPath(conf);
|
|
|
- Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf));
|
|
|
- Path unraided = unRaid(conf, srcPath, destPref, decoder,
|
|
|
- stripeLength, corruptOffset);
|
|
|
- if (unraided != null) {
|
|
|
- return unraided.toString();
|
|
|
- }
|
|
|
-
|
|
|
- // try decode using ReedSolomon code
|
|
|
- destPref = rsDestinationPath(conf);
|
|
|
- decoder = new ReedSolomonDecoder(conf, RaidNode.getStripeLength(conf),
|
|
|
- RaidNode.rsParityLength(conf));
|
|
|
- unraided = unRaid(conf, srcPath, destPref, decoder,
|
|
|
- stripeLength, corruptOffset);
|
|
|
- if (unraided != null) {
|
|
|
- return unraided.toString();
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * returns the number of raid jobs running for a particular policy
|
|
|
- */
|
|
|
- abstract int getRunningJobsForPolicy(String policyName);
|
|
|
-
|
|
|
- /**
|
|
|
- * Periodically checks to see which policies should be fired.
|
|
|
- */
|
|
|
- class TriggerMonitor implements Runnable {
|
|
|
- class ScanState {
|
|
|
- long fullScanStartTime;
|
|
|
- DirectoryTraversal pendingTraversal;
|
|
|
- RaidFilter.Statistics stats;
|
|
|
- ScanState() {
|
|
|
- fullScanStartTime = 0;
|
|
|
- pendingTraversal = null;
|
|
|
- stats = new RaidFilter.Statistics();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private Map<String, ScanState> scanStateMap =
|
|
|
- new HashMap<String, ScanState>();
|
|
|
-
|
|
|
- /**
|
|
|
- */
|
|
|
- public void run() {
|
|
|
- while (running) {
|
|
|
- try {
|
|
|
- doProcess();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error(StringUtils.stringifyException(e));
|
|
|
- } finally {
|
|
|
- LOG.info("Trigger thread continuing to run...");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Should we select more files for a policy.
|
|
|
- */
|
|
|
- private boolean shouldSelectFiles(PolicyInfo info) {
|
|
|
- String policyName = info.getName();
|
|
|
- ScanState scanState = scanStateMap.get(policyName);
|
|
|
- int runningJobsCount = getRunningJobsForPolicy(policyName);
|
|
|
- // Is there a scan in progress for this policy?
|
|
|
- if (scanState.pendingTraversal != null) {
|
|
|
- int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
|
|
|
-
|
|
|
- // If there is a scan in progress for this policy, we can have
|
|
|
- // upto maxJobsPerPolicy running jobs.
|
|
|
- return (runningJobsCount < maxJobsPerPolicy);
|
|
|
- } else {
|
|
|
- // If there isn't a scan in progress for this policy, we don't
|
|
|
- // want to start a fresh scan if there is even one running job.
|
|
|
- if (runningJobsCount >= 1) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- // Check the time of the last full traversal before starting a fresh
|
|
|
- // traversal.
|
|
|
- long lastScan = scanState.fullScanStartTime;
|
|
|
- return (now() > lastScan + configMgr.getPeriodicity());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns a list of pathnames that needs raiding.
|
|
|
- * The list of paths could be obtained by resuming a previously suspended
|
|
|
- * traversal.
|
|
|
- * The number of paths returned is limited by raid.distraid.max.jobs.
|
|
|
- */
|
|
|
- private List<FileStatus> selectFiles(
|
|
|
- PolicyInfo info, ArrayList<PolicyInfo> allPolicies) throws IOException {
|
|
|
- Path destPrefix = getDestinationPath(info.getErasureCode(), conf);
|
|
|
- String policyName = info.getName();
|
|
|
- Path srcPath = info.getSrcPath();
|
|
|
- long modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
|
|
|
-
|
|
|
- // Max number of files returned.
|
|
|
- int selectLimit = configMgr.getMaxFilesPerJob();
|
|
|
- int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
|
|
|
-
|
|
|
- long selectStartTime = Time.now();
|
|
|
-
|
|
|
- ScanState scanState = scanStateMap.get(policyName);
|
|
|
- // If we have a pending traversal, resume it.
|
|
|
- if (scanState.pendingTraversal != null) {
|
|
|
- DirectoryTraversal dt = scanState.pendingTraversal;
|
|
|
- LOG.info("Resuming traversal for policy " + policyName);
|
|
|
- DirectoryTraversal.FileFilter filter =
|
|
|
- filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
|
|
|
- List<FileStatus> returnSet = dt.getFilteredFiles(filter, selectLimit);
|
|
|
- if (dt.doneTraversal()) {
|
|
|
- scanState.pendingTraversal = null;
|
|
|
- }
|
|
|
- return returnSet;
|
|
|
- }
|
|
|
-
|
|
|
- // Expand destination prefix path.
|
|
|
- String destpstr = destPrefix.toString();
|
|
|
- if (!destpstr.endsWith(Path.SEPARATOR)) {
|
|
|
- destpstr += Path.SEPARATOR;
|
|
|
- }
|
|
|
-
|
|
|
- List<FileStatus> returnSet = new LinkedList<FileStatus>();
|
|
|
-
|
|
|
- FileSystem fs = srcPath.getFileSystem(conf);
|
|
|
- FileStatus[] gpaths = fs.globStatus(srcPath);
|
|
|
- if (gpaths != null) {
|
|
|
- List<FileStatus> selectedPaths = new LinkedList<FileStatus>();
|
|
|
- for (FileStatus onepath: gpaths) {
|
|
|
- String pathstr = onepath.getPath().makeQualified(fs).toString();
|
|
|
- if (!pathstr.endsWith(Path.SEPARATOR)) {
|
|
|
- pathstr += Path.SEPARATOR;
|
|
|
- }
|
|
|
- if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
|
|
|
- LOG.info("Skipping source " + pathstr +
|
|
|
- " because it conflicts with raid directory " + destpstr);
|
|
|
- } else {
|
|
|
- selectedPaths.add(onepath);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Set the time for a new traversal.
|
|
|
- scanState.fullScanStartTime = now();
|
|
|
- DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths,
|
|
|
- conf.getInt("raid.directorytraversal.threads", 4));
|
|
|
- DirectoryTraversal.FileFilter filter =
|
|
|
- filterForPolicy(selectStartTime, info, allPolicies, scanState.stats);
|
|
|
- returnSet = dt.getFilteredFiles(filter, selectLimit);
|
|
|
- if (!dt.doneTraversal()) {
|
|
|
- scanState.pendingTraversal = dt;
|
|
|
- }
|
|
|
- }
|
|
|
- return returnSet;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Keep processing policies.
|
|
|
- * If the config file has changed, then reload config file and start afresh.
|
|
|
- */
|
|
|
- private void doProcess() throws IOException, InterruptedException {
|
|
|
- ArrayList<PolicyInfo> allPolicies = new ArrayList<PolicyInfo>();
|
|
|
- for (PolicyList category : configMgr.getAllPolicies()) {
|
|
|
- for (PolicyInfo info: category.getAll()) {
|
|
|
- allPolicies.add(info);
|
|
|
- }
|
|
|
- }
|
|
|
- while (running) {
|
|
|
- Thread.sleep(RaidNode.triggerMonitorSleepTime);
|
|
|
-
|
|
|
- boolean reloaded = configMgr.reloadConfigsIfNecessary();
|
|
|
- if (reloaded) {
|
|
|
- allPolicies.clear();
|
|
|
- for (PolicyList category : configMgr.getAllPolicies()) {
|
|
|
- for (PolicyInfo info: category.getAll()) {
|
|
|
- allPolicies.add(info);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- for (PolicyInfo info: allPolicies) {
|
|
|
- if (!scanStateMap.containsKey(info.getName())) {
|
|
|
- scanStateMap.put(info.getName(), new ScanState());
|
|
|
- }
|
|
|
-
|
|
|
- if (!shouldSelectFiles(info)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Triggering Policy Filter " + info.getName() +
|
|
|
- " " + info.getSrcPath());
|
|
|
- List<FileStatus> filteredPaths = null;
|
|
|
- try {
|
|
|
- filteredPaths = selectFiles(info, allPolicies);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Exception while invoking filter on policy " + info.getName() +
|
|
|
- " srcPath " + info.getSrcPath() +
|
|
|
- " exception " + StringUtils.stringifyException(e));
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- if (filteredPaths == null || filteredPaths.size() == 0) {
|
|
|
- LOG.info("No filtered paths for policy " + info.getName());
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // Apply the action on accepted paths
|
|
|
- LOG.info("Triggering Policy Action " + info.getName() +
|
|
|
- " " + info.getSrcPath() + " raid " + filteredPaths.size() + " files");
|
|
|
- try {
|
|
|
- raidFiles(info, filteredPaths);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Exception while invoking action on policy " + info.getName() +
|
|
|
- " srcPath " + info.getSrcPath() +
|
|
|
- " exception " + StringUtils.stringifyException(e));
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- DirectoryTraversal.FileFilter filterForPolicy(
|
|
|
- long startTime, PolicyInfo info, List<PolicyInfo> allPolicies,
|
|
|
- RaidFilter.Statistics stats)
|
|
|
- throws IOException {
|
|
|
- switch (info.getErasureCode()) {
|
|
|
- case XOR:
|
|
|
- // Return a preference-based filter that prefers RS parity files
|
|
|
- // over XOR parity files.
|
|
|
- return new RaidFilter.PreferenceFilter(
|
|
|
- conf, rsDestinationPath(conf), xorDestinationPath(conf),
|
|
|
- info, allPolicies, startTime, stats);
|
|
|
- case RS:
|
|
|
- return new RaidFilter.TimeBasedFilter(conf, rsDestinationPath(conf),
|
|
|
- info, allPolicies, startTime, stats);
|
|
|
- default:
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * raid a list of files, this will be overridden by subclasses of RaidNode
|
|
|
- */
|
|
|
- abstract void raidFiles(PolicyInfo info, List<FileStatus> paths)
|
|
|
- throws IOException;
|
|
|
-
|
|
|
- static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
|
|
|
- return new Path(destPathPrefix, makeRelative(srcPath));
|
|
|
- }
|
|
|
-
|
|
|
- static class ParityFilePair {
|
|
|
- private Path path;
|
|
|
- private FileSystem fs;
|
|
|
-
|
|
|
- public ParityFilePair( Path path, FileSystem fs) {
|
|
|
- this.path = path;
|
|
|
- this.fs = fs;
|
|
|
- }
|
|
|
-
|
|
|
- public Path getPath() {
|
|
|
- return this.path;
|
|
|
- }
|
|
|
-
|
|
|
- public FileSystem getFileSystem() {
|
|
|
- return this.fs;
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the Path to the parity file of a given file
|
|
|
- *
|
|
|
- * @param destPathPrefix Destination prefix defined by some policy
|
|
|
- * @param srcPath Path to the original source file
|
|
|
- * @param create Boolean value telling whether a new parity file should be created
|
|
|
- * @return Path object representing the parity file of the source
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- static ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
|
|
|
- Path srcParent = srcPath.getParent();
|
|
|
-
|
|
|
- FileSystem fsDest = destPathPrefix.getFileSystem(conf);
|
|
|
- FileSystem fsSrc = srcPath.getFileSystem(conf);
|
|
|
-
|
|
|
- FileStatus srcStatus = null;
|
|
|
- try {
|
|
|
- srcStatus = fsSrc.getFileStatus(srcPath);
|
|
|
- } catch (java.io.FileNotFoundException e) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- Path outDir = destPathPrefix;
|
|
|
- if (srcParent != null) {
|
|
|
- if (srcParent.getParent() == null) {
|
|
|
- outDir = destPathPrefix;
|
|
|
- } else {
|
|
|
- outDir = new Path(destPathPrefix, makeRelative(srcParent));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- //CASE 1: CHECK HAR - Must be checked first because har is created after
|
|
|
- // parity file and returning the parity file could result in error while
|
|
|
- // reading it.
|
|
|
- Path outPath = getOriginalParityFile(destPathPrefix, srcPath);
|
|
|
- String harDirName = srcParent.getName() + HAR_SUFFIX;
|
|
|
- Path HarPath = new Path(outDir,harDirName);
|
|
|
- if (fsDest.exists(HarPath)) {
|
|
|
- URI HarPathUri = HarPath.toUri();
|
|
|
- Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
|
|
|
- FileSystem fsHar = new HarFileSystem(fsDest);
|
|
|
- fsHar.initialize(inHarPath.toUri(), conf);
|
|
|
- if (fsHar.exists(inHarPath)) {
|
|
|
- FileStatus inHar = fsHar.getFileStatus(inHarPath);
|
|
|
- if (inHar.getModificationTime() == srcStatus.getModificationTime()) {
|
|
|
- return new ParityFilePair(inHarPath,fsHar);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //CASE 2: CHECK PARITY
|
|
|
- try {
|
|
|
- FileStatus outHar = fsDest.getFileStatus(outPath);
|
|
|
- if (outHar.getModificationTime() == srcStatus.getModificationTime()) {
|
|
|
- return new ParityFilePair(outPath,fsDest);
|
|
|
- }
|
|
|
- } catch (java.io.FileNotFoundException e) {
|
|
|
- }
|
|
|
-
|
|
|
- return null; // NULL if no parity file
|
|
|
- }
|
|
|
-
|
|
|
- static ParityFilePair xorParityForSource(Path srcPath, Configuration conf)
|
|
|
- throws IOException {
|
|
|
- try {
|
|
|
- Path destPath = xorDestinationPath(conf);
|
|
|
- return getParityFile(destPath, srcPath, conf);
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- static ParityFilePair rsParityForSource(Path srcPath, Configuration conf)
|
|
|
- throws IOException {
|
|
|
- try {
|
|
|
- Path destPath = rsDestinationPath(conf);
|
|
|
- return getParityFile(destPath, srcPath, conf);
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath)
|
|
|
- throws IOException {
|
|
|
- return getParityFile(destPathPrefix, srcPath, conf);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * RAID a list of files.
|
|
|
- */
|
|
|
- void doRaid(Configuration conf, PolicyInfo info, List<FileStatus> paths)
|
|
|
- throws IOException {
|
|
|
- int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
|
|
|
- int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
|
|
|
- int stripeLength = getStripeLength(conf);
|
|
|
- Path destPref = getDestinationPath(info.getErasureCode(), conf);
|
|
|
- String simulate = info.getProperty("simulate");
|
|
|
- boolean doSimulate = simulate == null ? false : Boolean
|
|
|
- .parseBoolean(simulate);
|
|
|
-
|
|
|
- Statistics statistics = new Statistics();
|
|
|
- int count = 0;
|
|
|
-
|
|
|
- for (FileStatus s : paths) {
|
|
|
- doRaid(conf, s, destPref, info.getErasureCode(), statistics,
|
|
|
- new RaidUtils.DummyProgressable(), doSimulate, targetRepl, metaRepl,
|
|
|
- stripeLength);
|
|
|
- if (count % 1000 == 0) {
|
|
|
- LOG.info("RAID statistics " + statistics.toString());
|
|
|
- }
|
|
|
- count++;
|
|
|
- }
|
|
|
- LOG.info("RAID statistics " + statistics.toString());
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * RAID an individual file
|
|
|
- */
|
|
|
-
|
|
|
- static public void doRaid(Configuration conf, PolicyInfo info,
|
|
|
- FileStatus src, Statistics statistics, Progressable reporter)
|
|
|
- throws IOException {
|
|
|
- int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
|
|
|
- int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
|
|
|
- int stripeLength = getStripeLength(conf);
|
|
|
-
|
|
|
- Path destPref = getDestinationPath(info.getErasureCode(), conf);
|
|
|
- String simulate = info.getProperty("simulate");
|
|
|
- boolean doSimulate = simulate == null ? false : Boolean
|
|
|
- .parseBoolean(simulate);
|
|
|
-
|
|
|
- doRaid(conf, src, destPref, info.getErasureCode(), statistics,
|
|
|
- reporter, doSimulate, targetRepl, metaRepl, stripeLength);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * RAID an individual file
|
|
|
- */
|
|
|
- static public void doRaid(Configuration conf, FileStatus stat, Path destPath,
|
|
|
- PolicyInfo.ErasureCodeType code, Statistics statistics,
|
|
|
- Progressable reporter, boolean doSimulate,
|
|
|
- int targetRepl, int metaRepl, int stripeLength)
|
|
|
- throws IOException {
|
|
|
- Path p = stat.getPath();
|
|
|
- FileSystem srcFs = p.getFileSystem(conf);
|
|
|
-
|
|
|
- // extract block locations from File system
|
|
|
- BlockLocation[] locations = srcFs.getFileBlockLocations(stat, 0, stat.getLen());
|
|
|
-
|
|
|
- // if the file has fewer than 2 blocks, then nothing to do
|
|
|
- if (locations.length <= 2) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // add up the raw disk space occupied by this file
|
|
|
- long diskSpace = 0;
|
|
|
- for (BlockLocation l: locations) {
|
|
|
- diskSpace += (l.getLength() * stat.getReplication());
|
|
|
- }
|
|
|
- statistics.numProcessedBlocks += locations.length;
|
|
|
- statistics.processedSize += diskSpace;
|
|
|
-
|
|
|
- // generate parity file
|
|
|
- generateParityFile(conf, stat, reporter, srcFs, destPath, code,
|
|
|
- locations, metaRepl, stripeLength);
|
|
|
-
|
|
|
- // reduce the replication factor of the source file
|
|
|
- if (!doSimulate) {
|
|
|
- if (srcFs.setReplication(p, (short)targetRepl) == false) {
|
|
|
- LOG.error("Error in reducing relication factor of file " + p + " to " + targetRepl);
|
|
|
- statistics.remainingSize += diskSpace; // no change in disk space usage
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- diskSpace = 0;
|
|
|
- for (BlockLocation l: locations) {
|
|
|
- diskSpace += (l.getLength() * targetRepl);
|
|
|
- }
|
|
|
- statistics.remainingSize += diskSpace;
|
|
|
-
|
|
|
- // the metafile will have this many number of blocks
|
|
|
- int numMeta = locations.length / stripeLength;
|
|
|
- if (locations.length % stripeLength != 0) {
|
|
|
- numMeta++;
|
|
|
- }
|
|
|
-
|
|
|
- // we create numMeta for every file. This metablock has metaRepl # replicas.
|
|
|
- // the last block of the metafile might not be completely filled up, but we
|
|
|
- // ignore that for now.
|
|
|
- statistics.numMetaBlocks += (numMeta * metaRepl);
|
|
|
- statistics.metaSize += (numMeta * metaRepl * stat.getBlockSize());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create the parity file.
|
|
|
- */
|
|
|
- static private void generateParityFile(Configuration conf, FileStatus stat,
|
|
|
- Progressable reporter,
|
|
|
- FileSystem inFs,
|
|
|
- Path destPathPrefix,
|
|
|
- ErasureCodeType code,
|
|
|
- BlockLocation[] locations,
|
|
|
- int metaRepl, int stripeLength) throws IOException {
|
|
|
-
|
|
|
- Path inpath = stat.getPath();
|
|
|
- Path outpath = getOriginalParityFile(destPathPrefix, inpath);
|
|
|
- FileSystem outFs = outpath.getFileSystem(conf);
|
|
|
-
|
|
|
- // If the parity file is already upto-date, then nothing to do
|
|
|
- try {
|
|
|
- FileStatus stmp = outFs.getFileStatus(outpath);
|
|
|
- if (stmp.getModificationTime() == stat.getModificationTime()) {
|
|
|
- LOG.info("Parity file for " + inpath + "(" + locations.length +
|
|
|
- ") is " + outpath + " already upto-date. Nothing more to do.");
|
|
|
- return;
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- // ignore errors because the raid file might not exist yet.
|
|
|
- }
|
|
|
-
|
|
|
- Encoder encoder = encoderForCode(conf, code);
|
|
|
- encoder.encodeFile(inFs, inpath, outFs, outpath, (short)metaRepl, reporter);
|
|
|
-
|
|
|
- // set the modification time of the RAID file. This is done so that the modTime of the
|
|
|
- // RAID file reflects that contents of the source file that it has RAIDed. This should
|
|
|
- // also work for files that are being appended to. This is necessary because the time on
|
|
|
- // on the destination namenode may not be synchronised with the timestamp of the
|
|
|
- // source namenode.
|
|
|
- outFs.setTimes(outpath, stat.getModificationTime(), -1);
|
|
|
- inFs.setTimes(inpath, stat.getModificationTime(), stat.getAccessTime());
|
|
|
-
|
|
|
- FileStatus outstat = outFs.getFileStatus(outpath);
|
|
|
- FileStatus inStat = inFs.getFileStatus(inpath);
|
|
|
- LOG.info("Source file " + inpath + " of size " + inStat.getLen() +
|
|
|
- " Parity file " + outpath + " of size " + outstat.getLen() +
|
|
|
- " src mtime " + stat.getModificationTime() +
|
|
|
- " parity mtime " + outstat.getModificationTime());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Extract a good block from the parity block. This assumes that the
|
|
|
- * corruption is in the main file and the parity file is always good.
|
|
|
- */
|
|
|
- public static Path unRaid(Configuration conf, Path srcPath,
|
|
|
- Path destPathPrefix, Decoder decoder, int stripeLength,
|
|
|
- long corruptOffset) throws IOException {
|
|
|
-
|
|
|
- // Test if parity file exists
|
|
|
- ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
|
|
|
- if (ppair == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- final Path recoveryDestination = new Path(
|
|
|
- RaidNode.unraidTmpDirectory(conf));
|
|
|
- FileSystem destFs = recoveryDestination.getFileSystem(conf);
|
|
|
- final Path recoveredPrefix =
|
|
|
- destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
|
|
|
- final Path recoveredPath =
|
|
|
- new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
|
|
|
- LOG.info("Creating recovered file " + recoveredPath);
|
|
|
-
|
|
|
- FileSystem srcFs = srcPath.getFileSystem(conf);
|
|
|
- decoder.decodeFile(srcFs, srcPath, ppair.getFileSystem(),
|
|
|
- ppair.getPath(), corruptOffset, recoveredPath);
|
|
|
-
|
|
|
- return recoveredPath;
|
|
|
- }
|
|
|
-
|
|
|
- public static Path unRaidCorruptBlock(Configuration conf, Path srcPath,
|
|
|
- Path destPathPrefix, Decoder decoder, int stripeLength,
|
|
|
- long corruptOffset) throws IOException {
|
|
|
- // Test if parity file exists
|
|
|
- ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
|
|
|
- if (ppair == null) {
|
|
|
- LOG.error("Could not find parity file for " + srcPath);
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- final Path recoveryDestination = new Path(RaidNode.xorTempPrefix(conf));
|
|
|
- FileSystem destFs = recoveryDestination.getFileSystem(conf);
|
|
|
- final Path recoveredPrefix =
|
|
|
- destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
|
|
|
- final Path recoveredBlock =
|
|
|
- new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
|
|
|
- LOG.info("Creating recovered Block " + recoveredBlock);
|
|
|
-
|
|
|
- FileSystem srcFs = srcPath.getFileSystem(conf);
|
|
|
- FileStatus stat = srcFs.getFileStatus(srcPath);
|
|
|
- long limit = Math.min(stat.getBlockSize(), stat.getLen() - corruptOffset);
|
|
|
- java.io.OutputStream out = ppair.getFileSystem().create(recoveredBlock);
|
|
|
- decoder.fixErasedBlock(srcFs, srcPath,
|
|
|
- ppair.getFileSystem(), ppair.getPath(),
|
|
|
- stat.getBlockSize(), corruptOffset, 0, limit, out);
|
|
|
- out.close();
|
|
|
- return recoveredBlock;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Periodically delete orphaned parity files.
|
|
|
- */
|
|
|
- class PurgeMonitor implements Runnable {
|
|
|
- /**
|
|
|
- */
|
|
|
- public void run() {
|
|
|
- while (running) {
|
|
|
- try {
|
|
|
- doPurge();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error(StringUtils.stringifyException(e));
|
|
|
- } finally {
|
|
|
- LOG.info("Purge parity files thread continuing to run...");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Traverse the parity destination directory, removing directories that
|
|
|
- * no longer existing in the source.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void purgeDirectories(FileSystem fs, Path root) throws IOException {
|
|
|
- String prefix = root.toUri().getPath();
|
|
|
- List<FileStatus> startPaths = new LinkedList<FileStatus>();
|
|
|
- try {
|
|
|
- startPaths.add(fs.getFileStatus(root));
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- return;
|
|
|
- }
|
|
|
- DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
|
|
|
- FileStatus dir = dt.getNextDirectory();
|
|
|
- for (; dir != null; dir = dt.getNextDirectory()) {
|
|
|
- Path dirPath = dir.getPath();
|
|
|
- if (dirPath.toUri().getPath().endsWith(HAR_SUFFIX)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- String dirStr = dirPath.toUri().getPath();
|
|
|
- if (!dirStr.startsWith(prefix)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- String src = dirStr.replaceFirst(prefix, "");
|
|
|
- if (src.length() == 0) continue;
|
|
|
- Path srcPath = new Path(src);
|
|
|
- if (!fs.exists(srcPath)) {
|
|
|
- LOG.info("Purging directory " + dirPath);
|
|
|
- boolean done = fs.delete(dirPath, true);
|
|
|
- if (!done) {
|
|
|
- LOG.error("Could not purge " + dirPath);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Delete orphaned files. The reason this is done by a separate thread
|
|
|
- * is to not burden the TriggerMonitor with scanning the
|
|
|
- * destination directories.
|
|
|
- */
|
|
|
- private void doPurge() throws IOException, InterruptedException {
|
|
|
- long prevExec = 0;
|
|
|
- while (running) {
|
|
|
-
|
|
|
- // The config may be reloaded by the TriggerMonitor.
|
|
|
- // This thread uses whatever config is currently active.
|
|
|
- while(now() < prevExec + configMgr.getPeriodicity()){
|
|
|
- Thread.sleep(SLEEP_TIME);
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Started purge scan");
|
|
|
- prevExec = now();
|
|
|
-
|
|
|
- // expand destination prefix path
|
|
|
- Path destPref = xorDestinationPath(conf);
|
|
|
- FileSystem destFs = destPref.getFileSystem(conf);
|
|
|
- purgeDirectories(destFs, destPref);
|
|
|
-
|
|
|
- destPref = rsDestinationPath(conf);
|
|
|
- destFs = destPref.getFileSystem(conf);
|
|
|
- purgeDirectories(destFs, destPref);
|
|
|
-
|
|
|
- // fetch all categories
|
|
|
- for (PolicyList category : configMgr.getAllPolicies()) {
|
|
|
- for (PolicyInfo info: category.getAll()) {
|
|
|
-
|
|
|
- try {
|
|
|
- // expand destination prefix path
|
|
|
- destPref = getDestinationPath(info.getErasureCode(), conf);
|
|
|
- destFs = destPref.getFileSystem(conf);
|
|
|
-
|
|
|
- //get srcPaths
|
|
|
- Path[] srcPaths = info.getSrcPathExpanded();
|
|
|
-
|
|
|
- if (srcPaths != null) {
|
|
|
- for (Path srcPath: srcPaths) {
|
|
|
- // expand destination prefix
|
|
|
- Path destPath = getOriginalParityFile(destPref, srcPath);
|
|
|
-
|
|
|
- FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
|
|
|
- FileStatus stat = null;
|
|
|
- try {
|
|
|
- stat = destFs.getFileStatus(destPath);
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- // do nothing, leave stat = null;
|
|
|
- }
|
|
|
- if (stat != null) {
|
|
|
- LOG.info("Purging obsolete parity files for policy " +
|
|
|
- info.getName() + " " + destPath);
|
|
|
- recursePurge(info.getErasureCode(), srcFs, destFs,
|
|
|
- destPref.toUri().getPath(), stat);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Ignoring Exception while processing policy " +
|
|
|
- info.getName() + " " +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * The destPrefix is the absolute pathname of the destinationPath
|
|
|
- * specified in the policy (without the host:port)
|
|
|
- */
|
|
|
- void recursePurge(ErasureCodeType code,
|
|
|
- FileSystem srcFs, FileSystem destFs,
|
|
|
- String destPrefix, FileStatus dest)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- Path destPath = dest.getPath(); // pathname, no host:port
|
|
|
- String destStr = destPath.toUri().getPath();
|
|
|
- LOG.debug("Checking " + destPath + " prefix " + destPrefix);
|
|
|
-
|
|
|
- // Verify if it is a har file
|
|
|
- if (dest.isDirectory() && destStr.endsWith(HAR_SUFFIX)) {
|
|
|
- try {
|
|
|
- int harUsedPercent =
|
|
|
- usefulHar(code, srcFs, destFs, destPath, destPrefix, conf);
|
|
|
- LOG.info("Useful percentage of " + destStr + " " + harUsedPercent);
|
|
|
- // Delete the har if its usefulness reaches a threshold.
|
|
|
- if (harUsedPercent <= conf.getInt("raid.har.usage.threshold", 0)) {
|
|
|
- LOG.info("Purging " + destStr + " at usage " + harUsedPercent);
|
|
|
- boolean done = destFs.delete(destPath, true);
|
|
|
- if (!done) {
|
|
|
- LOG.error("Unable to purge directory " + destPath);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Error during purging " + destStr + " " +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Verify the destPrefix is a prefix of the destPath
|
|
|
- if (!destStr.startsWith(destPrefix)) {
|
|
|
- LOG.error("Destination path " + destStr + " should have " +
|
|
|
- destPrefix + " as its prefix.");
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (dest.isDirectory()) {
|
|
|
- FileStatus[] files = null;
|
|
|
- files = destFs.listStatus(destPath);
|
|
|
- if (files == null || files.length == 0){
|
|
|
- boolean done = destFs.delete(destPath,true); // ideal is false, but
|
|
|
- // DFSClient only deletes directories if it is recursive
|
|
|
- if (done) {
|
|
|
- LOG.info("Purged directory " + destPath );
|
|
|
- }
|
|
|
- else {
|
|
|
- LOG.error("Unable to purge directory " + destPath);
|
|
|
- }
|
|
|
- }
|
|
|
- if (files != null) {
|
|
|
- for (FileStatus one:files) {
|
|
|
- recursePurge(code, srcFs, destFs, destPrefix, one);
|
|
|
- }
|
|
|
- }
|
|
|
- // If the directory is empty now, it will be purged the next time this
|
|
|
- // thread runs.
|
|
|
- return; // the code below does the file checking
|
|
|
- }
|
|
|
-
|
|
|
- String src = destStr.replaceFirst(destPrefix, "");
|
|
|
-
|
|
|
- Path srcPath = new Path(src);
|
|
|
- boolean shouldDelete = false;
|
|
|
-
|
|
|
- if (!srcFs.exists(srcPath)) {
|
|
|
- shouldDelete = true;
|
|
|
- } else {
|
|
|
- try {
|
|
|
- // If there is a RS parity file, the XOR parity can be deleted.
|
|
|
- if (code == ErasureCodeType.XOR) {
|
|
|
- ParityFilePair ppair = getParityFile(
|
|
|
- getDestinationPath(ErasureCodeType.RS, conf), srcPath, conf);
|
|
|
- if (ppair != null) {
|
|
|
- shouldDelete = true;
|
|
|
- }
|
|
|
- }
|
|
|
- if (!shouldDelete) {
|
|
|
- Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
|
|
|
- ParityFilePair ppair = getParityFile(dstPath,srcPath);
|
|
|
- // If the parity file is not the appropriate one for the source or
|
|
|
- // the parityFs is not the same as this file's filesystem
|
|
|
- // (it is a HAR), this file can be deleted.
|
|
|
- if ( ppair == null ||
|
|
|
- !destFs.equals(ppair.getFileSystem()) ||
|
|
|
- !destPath.equals(ppair.getPath())) {
|
|
|
- shouldDelete = true;
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Error during purging " + src + " " +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (shouldDelete) {
|
|
|
- boolean done = destFs.delete(destPath, false);
|
|
|
- if (done) {
|
|
|
- LOG.info("Purged file " + destPath );
|
|
|
- } else {
|
|
|
- LOG.error("Unable to purge file " + destPath );
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Returns the number of up-to-date files in the har as a percentage of the
|
|
|
- // total number of files in the har.
|
|
|
- //
|
|
|
- protected static int usefulHar(
|
|
|
- ErasureCodeType code,
|
|
|
- FileSystem srcFs, FileSystem destFs,
|
|
|
- Path harPath, String destPrefix, Configuration conf) throws IOException {
|
|
|
-
|
|
|
- FileSystem fsHar = new HarFileSystem(destFs);
|
|
|
- String harURIPath = harPath.toUri().getPath();
|
|
|
- Path qualifiedPath = new Path("har://", harURIPath +
|
|
|
- Path.SEPARATOR + harPath.getParent().toUri().getPath());
|
|
|
- fsHar.initialize(qualifiedPath.toUri(), conf);
|
|
|
- FileStatus[] filesInHar = fsHar.listStatus(qualifiedPath);
|
|
|
- if (filesInHar.length == 0) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- int numUseless = 0;
|
|
|
- for (FileStatus one: filesInHar) {
|
|
|
- Path parityPath = one.getPath();
|
|
|
- String parityStr = parityPath.toUri().getPath();
|
|
|
- if (parityStr.startsWith("har:/")) {
|
|
|
- LOG.error("Unexpected prefix har:/ for " + parityStr);
|
|
|
- continue;
|
|
|
- }
|
|
|
- String prefixToReplace = harURIPath + destPrefix;
|
|
|
- if (!parityStr.startsWith(prefixToReplace)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- String src = parityStr.substring(prefixToReplace.length());
|
|
|
- if (code == ErasureCodeType.XOR) {
|
|
|
- ParityFilePair ppair = getParityFile(
|
|
|
- getDestinationPath(ErasureCodeType.RS, conf), new Path(src), conf);
|
|
|
- if (ppair != null) {
|
|
|
- // There is a valid RS parity file, so the XOR one is useless.
|
|
|
- numUseless++;
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- try {
|
|
|
- FileStatus srcStatus = srcFs.getFileStatus(new Path(src));
|
|
|
- if (srcStatus == null) {
|
|
|
- numUseless++;
|
|
|
- } else if (one.getModificationTime() !=
|
|
|
- srcStatus.getModificationTime()) {
|
|
|
- numUseless++;
|
|
|
- }
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- LOG.info("File not found: " + e);
|
|
|
- numUseless++;
|
|
|
- }
|
|
|
- }
|
|
|
- int uselessPercent = numUseless * 100 / filesInHar.length;
|
|
|
- return 100 - uselessPercent;
|
|
|
- }
|
|
|
-
|
|
|
- private void doHar() throws IOException, InterruptedException {
|
|
|
- long prevExec = 0;
|
|
|
- while (running) {
|
|
|
-
|
|
|
- // The config may be reloaded by the TriggerMonitor.
|
|
|
- // This thread uses whatever config is currently active.
|
|
|
- while(now() < prevExec + configMgr.getPeriodicity()){
|
|
|
- Thread.sleep(SLEEP_TIME);
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Started archive scan");
|
|
|
- prevExec = now();
|
|
|
-
|
|
|
- // fetch all categories
|
|
|
- for (PolicyList category : configMgr.getAllPolicies()) {
|
|
|
- for (PolicyInfo info: category.getAll()) {
|
|
|
- String tmpHarPath = tmpHarPathForCode(conf, info.getErasureCode());
|
|
|
- String str = info.getProperty("time_before_har");
|
|
|
- if (str != null) {
|
|
|
- try {
|
|
|
- long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
|
|
|
-
|
|
|
- Path destPref = getDestinationPath(info.getErasureCode(), conf);
|
|
|
- FileSystem destFs = destPref.getFileSystem(conf);
|
|
|
-
|
|
|
- //get srcPaths
|
|
|
- Path[] srcPaths = info.getSrcPathExpanded();
|
|
|
-
|
|
|
- if ( srcPaths != null ){
|
|
|
- for (Path srcPath: srcPaths) {
|
|
|
- // expand destination prefix
|
|
|
- Path destPath = getOriginalParityFile(destPref, srcPath);
|
|
|
-
|
|
|
- FileStatus stat = null;
|
|
|
- try {
|
|
|
- stat = destFs.getFileStatus(destPath);
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- // do nothing, leave stat = null;
|
|
|
- }
|
|
|
- if (stat != null) {
|
|
|
- LOG.info("Haring parity files for policy " +
|
|
|
- info.getName() + " " + destPath);
|
|
|
- recurseHar(info, destFs, stat, destPref.toUri().getPath(),
|
|
|
- srcPath.getFileSystem(conf), cutoff, tmpHarPath);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Ignoring Exception while processing policy " +
|
|
|
- info.getName() + " " +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
|
|
|
- String destPrefix, FileSystem srcFs, long cutoff, String tmpHarPath)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- if (dest.isFile()) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- Path destPath = dest.getPath(); // pathname, no host:port
|
|
|
- String destStr = destPath.toUri().getPath();
|
|
|
-
|
|
|
- // Verify if it already contains a HAR directory
|
|
|
- if ( destFs.exists(new Path(destPath, destPath.getName()+HAR_SUFFIX)) ) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- FileStatus[] files = null;
|
|
|
- files = destFs.listStatus(destPath);
|
|
|
- boolean shouldHar = false;
|
|
|
- if (files != null) {
|
|
|
- shouldHar = files.length > 0;
|
|
|
- for (FileStatus one: files) {
|
|
|
- if (one.isDirectory()){
|
|
|
- recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath);
|
|
|
- shouldHar = false;
|
|
|
- } else if (one.getModificationTime() > cutoff ) {
|
|
|
- if (shouldHar) {
|
|
|
- LOG.info("Cannot archive " + destPath +
|
|
|
- " because " + one.getPath() + " was modified after cutoff");
|
|
|
- shouldHar = false;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (shouldHar) {
|
|
|
- String src = destStr.replaceFirst(destPrefix, "");
|
|
|
- Path srcPath = new Path(src);
|
|
|
- FileStatus[] statuses = srcFs.listStatus(srcPath);
|
|
|
- Path destPathPrefix = new Path(destPrefix).makeQualified(destFs);
|
|
|
- if (statuses != null) {
|
|
|
- for (FileStatus status : statuses) {
|
|
|
- if (getParityFile(destPathPrefix,
|
|
|
- status.getPath().makeQualified(srcFs)) == null ) {
|
|
|
- LOG.info("Cannot archive " + destPath +
|
|
|
- " because it doesn't contain parity file for " +
|
|
|
- status.getPath().makeQualified(srcFs) + " on destination " +
|
|
|
- destPathPrefix);
|
|
|
- shouldHar = false;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if ( shouldHar ) {
|
|
|
- LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath );
|
|
|
- singleHar(info, destFs, dest, tmpHarPath);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private void singleHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
|
|
|
- String tmpHarPath) throws IOException {
|
|
|
-
|
|
|
- Random rand = new Random();
|
|
|
- Path root = new Path("/");
|
|
|
- Path qualifiedPath = dest.getPath().makeQualified(destFs);
|
|
|
- String harFileDst = qualifiedPath.getName() + HAR_SUFFIX;
|
|
|
- String harFileSrc = qualifiedPath.getName() + "-" +
|
|
|
- rand.nextLong() + "-" + HAR_SUFFIX;
|
|
|
- short metaReplication =
|
|
|
- (short) Integer.parseInt(info.getProperty("metaReplication"));
|
|
|
- // HadoopArchives.HAR_PARTFILE_LABEL is private, so hard-coding the label.
|
|
|
- conf.setLong("har.partfile.size", configMgr.getHarPartfileSize());
|
|
|
- HadoopArchives har = new HadoopArchives(conf);
|
|
|
- String[] args = new String[7];
|
|
|
- args[0] = "-Ddfs.replication=" + metaReplication;
|
|
|
- args[1] = "-archiveName";
|
|
|
- args[2] = harFileSrc;
|
|
|
- args[3] = "-p";
|
|
|
- args[4] = root.makeQualified(destFs).toString();
|
|
|
- args[5] = qualifiedPath.toUri().getPath().substring(1);
|
|
|
- args[6] = tmpHarPath.toString();
|
|
|
- int ret = 0;
|
|
|
- try {
|
|
|
- ret = ToolRunner.run(har, args);
|
|
|
- if (ret == 0 && !destFs.rename(new Path(tmpHarPath+"/"+harFileSrc),
|
|
|
- new Path(qualifiedPath, harFileDst))) {
|
|
|
- LOG.error("HAR rename didn't succeed from " + tmpHarPath+"/"+harFileSrc +
|
|
|
- " to " + qualifiedPath + "/" + harFileDst);
|
|
|
- ret = -2;
|
|
|
- }
|
|
|
- } catch (Exception exc) {
|
|
|
- throw new IOException("Error while creating archive " + ret, exc);
|
|
|
- }
|
|
|
-
|
|
|
- if (ret != 0){
|
|
|
- throw new IOException("Error while creating archive " + ret);
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Periodically generates HAR files
|
|
|
- */
|
|
|
- class HarMonitor implements Runnable {
|
|
|
-
|
|
|
- public void run() {
|
|
|
- while (running) {
|
|
|
- try {
|
|
|
- doHar();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error(StringUtils.stringifyException(e));
|
|
|
- } finally {
|
|
|
- LOG.info("Har parity files thread continuing to run...");
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.info("Leaving Har thread.");
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the temp path for XOR parity files
|
|
|
- */
|
|
|
- public static String unraidTmpDirectory(Configuration conf) {
|
|
|
- return conf.get(RAID_TMP_LOCATION_KEY, DEFAULT_RAID_TMP_LOCATION);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the temp path for ReedSolomonEncoder parity files
|
|
|
- */
|
|
|
- public static String rsTempPrefix(Configuration conf) {
|
|
|
- return conf.get(RAIDRS_TMP_LOCATION_KEY, DEFAULT_RAIDRS_TMP_LOCATION);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the temp path for XOR parity files
|
|
|
- */
|
|
|
- public static String xorTempPrefix(Configuration conf) {
|
|
|
- return conf.get(RAID_TMP_LOCATION_KEY, DEFAULT_RAID_TMP_LOCATION);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the temp path for XOR parity files
|
|
|
- */
|
|
|
- public static String xorHarTempPrefix(Configuration conf) {
|
|
|
- return conf.get(RAID_HAR_TMP_LOCATION_KEY, DEFAULT_RAID_HAR_TMP_LOCATION);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the temp path for ReedSolomonEncoder parity files
|
|
|
- */
|
|
|
- public static String rsHarTempPrefix(Configuration conf) {
|
|
|
- return conf.get(RAIDRS_HAR_TMP_LOCATION_KEY,
|
|
|
- DEFAULT_RAIDRS_HAR_TMP_LOCATION);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the destination path for ReedSolomon parity files
|
|
|
- */
|
|
|
- public static Path rsDestinationPath(Configuration conf, FileSystem fs) {
|
|
|
- String loc = conf.get(RAIDRS_LOCATION_KEY, DEFAULT_RAIDRS_LOCATION);
|
|
|
- Path p = new Path(loc.trim());
|
|
|
- p = p.makeQualified(fs);
|
|
|
- return p;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the destination path for ReedSolomon parity files
|
|
|
- */
|
|
|
- public static Path rsDestinationPath(Configuration conf)
|
|
|
- throws IOException {
|
|
|
- String loc = conf.get(RAIDRS_LOCATION_KEY, DEFAULT_RAIDRS_LOCATION);
|
|
|
- Path p = new Path(loc.trim());
|
|
|
- FileSystem fs = FileSystem.get(p.toUri(), conf);
|
|
|
- p = p.makeQualified(fs);
|
|
|
- return p;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the destination path for XOR parity files
|
|
|
- */
|
|
|
- public static Path xorDestinationPath(Configuration conf, FileSystem fs) {
|
|
|
- String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
|
|
|
- Path p = new Path(loc.trim());
|
|
|
- p = p.makeQualified(fs);
|
|
|
- return p;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the destination path for XOR parity files
|
|
|
- */
|
|
|
- public static Path xorDestinationPath(Configuration conf)
|
|
|
- throws IOException {
|
|
|
- String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
|
|
|
- Path p = new Path(loc.trim());
|
|
|
- FileSystem fs = FileSystem.get(p.toUri(), conf);
|
|
|
- p = p.makeQualified(fs);
|
|
|
- return p;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the path prefix that stores the parity files
|
|
|
- */
|
|
|
- static Path getDestinationPath(ErasureCodeType code, Configuration conf)
|
|
|
- throws IOException {
|
|
|
- switch (code) {
|
|
|
- case XOR:
|
|
|
- return xorDestinationPath(conf);
|
|
|
- case RS:
|
|
|
- return rsDestinationPath(conf);
|
|
|
- default:
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- static Encoder encoderForCode(Configuration conf, ErasureCodeType code) {
|
|
|
- int stripeLength = getStripeLength(conf);
|
|
|
- switch (code) {
|
|
|
- case XOR:
|
|
|
- return new XOREncoder(conf, stripeLength);
|
|
|
- case RS:
|
|
|
- return new ReedSolomonEncoder(conf, stripeLength, rsParityLength(conf));
|
|
|
- default:
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- static String tmpHarPathForCode(Configuration conf, ErasureCodeType code) {
|
|
|
- switch (code) {
|
|
|
- case XOR:
|
|
|
- return xorHarTempPrefix(conf);
|
|
|
- case RS:
|
|
|
- return rsHarTempPrefix(conf);
|
|
|
- default:
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Obtain stripe length from configuration
|
|
|
- */
|
|
|
- public static int getStripeLength(Configuration conf) {
|
|
|
- return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Obtain stripe length from configuration
|
|
|
- */
|
|
|
- public static int rsParityLength(Configuration conf) {
|
|
|
- return conf.getInt(RS_PARITY_LENGTH_KEY, RS_PARITY_LENGTH_DEFAULT);
|
|
|
- }
|
|
|
-
|
|
|
- static boolean isParityHarPartFile(Path p) {
|
|
|
- Matcher m = PARITY_HAR_PARTFILE_PATTERN.matcher(p.toUri().getPath());
|
|
|
- return m.matches();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns current time.
|
|
|
- */
|
|
|
- static long now() {
|
|
|
- return Time.now();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Make an absolute path relative by stripping the leading /
|
|
|
- */
|
|
|
- static private Path makeRelative(Path path) {
|
|
|
- if (!path.isAbsolute()) {
|
|
|
- return path;
|
|
|
- }
|
|
|
- String p = path.toUri().getPath();
|
|
|
- String relative = p.substring(1, p.length());
|
|
|
- return new Path(relative);
|
|
|
- }
|
|
|
-
|
|
|
- private static void printUsage() {
|
|
|
- System.err.println("Usage: java RaidNode ");
|
|
|
- }
|
|
|
-
|
|
|
- private static StartupOption parseArguments(String args[]) {
|
|
|
- int argsLen = (args == null) ? 0 : args.length;
|
|
|
- StartupOption startOpt = StartupOption.REGULAR;
|
|
|
- for(int i=0; i < argsLen; i++) {
|
|
|
- String cmd = args[i]; // We have to parse command line args in future.
|
|
|
- }
|
|
|
- return startOpt;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Convert command line options to configuration parameters
|
|
|
- */
|
|
|
- private static void setStartupOption(Configuration conf, StartupOption opt) {
|
|
|
- conf.set("fs.raidnode.startup", opt.toString());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create an instance of the appropriate subclass of RaidNode
|
|
|
- */
|
|
|
- public static RaidNode createRaidNode(Configuration conf)
|
|
|
- throws ClassNotFoundException {
|
|
|
- try {
|
|
|
- // default to distributed raid node
|
|
|
- Class<?> raidNodeClass =
|
|
|
- conf.getClass(RAIDNODE_CLASSNAME_KEY, DistRaidNode.class);
|
|
|
- if (!RaidNode.class.isAssignableFrom(raidNodeClass)) {
|
|
|
- throw new ClassNotFoundException("not an implementation of RaidNode");
|
|
|
- }
|
|
|
- Constructor<?> constructor =
|
|
|
- raidNodeClass.getConstructor(new Class[] {Configuration.class} );
|
|
|
- return (RaidNode) constructor.newInstance(conf);
|
|
|
- } catch (NoSuchMethodException e) {
|
|
|
- throw new ClassNotFoundException("cannot construct blockfixer", e);
|
|
|
- } catch (InstantiationException e) {
|
|
|
- throw new ClassNotFoundException("cannot construct blockfixer", e);
|
|
|
- } catch (IllegalAccessException e) {
|
|
|
- throw new ClassNotFoundException("cannot construct blockfixer", e);
|
|
|
- } catch (InvocationTargetException e) {
|
|
|
- throw new ClassNotFoundException("cannot construct blockfixer", e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create an instance of the RaidNode
|
|
|
- */
|
|
|
- public static RaidNode createRaidNode(String argv[], Configuration conf)
|
|
|
- throws IOException, ClassNotFoundException {
|
|
|
- if (conf == null) {
|
|
|
- conf = new Configuration();
|
|
|
- }
|
|
|
- StartupOption startOpt = parseArguments(argv);
|
|
|
- if (startOpt == null) {
|
|
|
- printUsage();
|
|
|
- return null;
|
|
|
- }
|
|
|
- setStartupOption(conf, startOpt);
|
|
|
- RaidNode node = createRaidNode(conf);
|
|
|
- return node;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- */
|
|
|
- public static void main(String argv[]) throws Exception {
|
|
|
- try {
|
|
|
- StringUtils.startupShutdownMessage(RaidNode.class, argv, LOG);
|
|
|
- RaidNode raid = createRaidNode(argv, null);
|
|
|
- if (raid != null) {
|
|
|
- raid.join();
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- LOG.error(StringUtils.stringifyException(e));
|
|
|
- System.exit(-1);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-}
|