|
@@ -60,6 +60,7 @@ import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* a archive creation utility.
|
|
|
* This class provides methods that can be used
|
|
@@ -77,12 +78,13 @@ public class HadoopArchives implements Tool {
|
|
|
static final String SRC_COUNT_LABEL = NAME + ".src.count";
|
|
|
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
|
|
|
static final String DST_HAR_LABEL = NAME + ".archive.name";
|
|
|
+ static final String SRC_PARENT_LABEL = NAME + ".parent.path";
|
|
|
// size of each part file
|
|
|
// its fixed for now.
|
|
|
static final long partSize = 2 * 1024 * 1024 * 1024l;
|
|
|
|
|
|
private static final String usage = "archive"
|
|
|
- + " -archiveName NAME <src>* <dest>" +
|
|
|
+ + " -archiveName NAME -p <parent path> <src>* <dest>" +
|
|
|
"\n";
|
|
|
|
|
|
|
|
@@ -228,24 +230,53 @@ public class HadoopArchives implements Tool {
|
|
|
return deepest;
|
|
|
}
|
|
|
|
|
|
- // this method is tricky. This method writes
|
|
|
- // the top level directories in such a way so that
|
|
|
- // the output only contains valid directoreis in archives.
|
|
|
- // so for an input path specified by the user
|
|
|
- // as /user/hadoop
|
|
|
- // we need to index
|
|
|
- // / as the root
|
|
|
- // /user as a directory
|
|
|
- // /user/hadoop as a directory
|
|
|
- // so for multiple input paths it makes sure that it
|
|
|
- // does the right thing.
|
|
|
- // so if the user specifies the input directories as
|
|
|
- // /user/harry and /user/hadoop
|
|
|
- // we need to write / and user as its child
|
|
|
- // and /user and harry and hadoop as its children
|
|
|
+ /**
|
|
|
+ * truncate the prefix root from the full path
|
|
|
+ * @param fullPath the full path
|
|
|
+ * @param root the prefix root to be truncated
|
|
|
+ * @return the relative path
|
|
|
+ */
|
|
|
+ private Path relPathToRoot(Path fullPath, Path root) {
|
|
|
+ // just take some effort to do it
|
|
|
+ // rather than just using substring
|
|
|
+ // so that we do not break sometime later
|
|
|
+ Path justRoot = new Path(Path.SEPARATOR);
|
|
|
+ if (fullPath.depth() == root.depth()) {
|
|
|
+ return justRoot;
|
|
|
+ }
|
|
|
+ else if (fullPath.depth() > root.depth()) {
|
|
|
+ Path retPath = new Path(fullPath.getName());
|
|
|
+ Path parent = fullPath.getParent();
|
|
|
+ for (int i=0; i < (fullPath.depth() - root.depth() -1); i++) {
|
|
|
+ retPath = new Path(parent.getName(), retPath);
|
|
|
+ parent = parent.getParent();
|
|
|
+ }
|
|
|
+ return new Path(justRoot, retPath);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * this method writes all the valid top level directories
|
|
|
+ * into the srcWriter for indexing. This method is a little
|
|
|
+ * tricky. example-
|
|
|
+ * for an input with parent path /home/user/ and sources
|
|
|
+ * as /home/user/source/dir1, /home/user/source/dir2 - this
|
|
|
+ * will output <source, dir, dir1, dir2> (dir means that source is a dir
|
|
|
+ * with dir1 and dir2 as children) and <source/dir1, file, null>
|
|
|
+ * and <source/dir2, file, null>
|
|
|
+ * @param srcWriter the sequence file writer to write the
|
|
|
+ * directories to
|
|
|
+ * @param paths the source paths provided by the user. They
|
|
|
+ * are glob free and have full path (not relative paths)
|
|
|
+ * @param parentPath the parent path that you wnat the archives
|
|
|
+ * to be relative to. example - /home/user/dir1 can be archived with
|
|
|
+ * parent as /home or /home/user.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
private void writeTopLevelDirs(SequenceFile.Writer srcWriter,
|
|
|
- List<Path> paths) throws IOException {
|
|
|
- //these are qualified paths
|
|
|
+ List<Path> paths, Path parentPath) throws IOException {
|
|
|
+ //add all the directories
|
|
|
List<Path> justDirs = new ArrayList<Path>();
|
|
|
for (Path p: paths) {
|
|
|
if (!p.getFileSystem(getConf()).isFile(p)) {
|
|
@@ -255,17 +286,23 @@ public class HadoopArchives implements Tool {
|
|
|
justDirs.add(new Path(p.getParent().toUri().getPath()));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- //get the largest depth path
|
|
|
- // this is tricky
|
|
|
- TreeMap<String, HashSet<String>> allpaths = new TreeMap<String, HashSet<String>>();
|
|
|
+ /* find all the common parents of paths that are valid archive
|
|
|
+ * paths. The below is done so that we do not add a common path
|
|
|
+ * twice and also we need to only add valid child of a path that
|
|
|
+ * are specified the user.
|
|
|
+ */
|
|
|
+ TreeMap<String, HashSet<String>> allpaths = new TreeMap<String,
|
|
|
+ HashSet<String>>();
|
|
|
+ /* the largest depth of paths. the max number of times
|
|
|
+ * we need to iterate
|
|
|
+ */
|
|
|
Path deepest = largestDepth(paths);
|
|
|
Path root = new Path(Path.SEPARATOR);
|
|
|
- for (int i = 0; i < deepest.depth(); i++) {
|
|
|
+ for (int i = parentPath.depth(); i < deepest.depth(); i++) {
|
|
|
List<Path> parents = new ArrayList<Path>();
|
|
|
for (Path p: justDirs) {
|
|
|
if (p.compareTo(root) == 0){
|
|
|
- //don nothing
|
|
|
+ //do nothing
|
|
|
}
|
|
|
else {
|
|
|
Path parent = p.getParent();
|
|
@@ -285,34 +322,40 @@ public class HadoopArchives implements Tool {
|
|
|
}
|
|
|
Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
|
|
|
for (Map.Entry<String, HashSet<String>> entry : keyVals) {
|
|
|
- HashSet<String> children = entry.getValue();
|
|
|
- String toWrite = entry.getKey() + " dir ";
|
|
|
- StringBuffer sbuff = new StringBuffer();
|
|
|
- sbuff.append(toWrite);
|
|
|
- for (String child: children) {
|
|
|
- sbuff.append(child + " ");
|
|
|
+ Path relPath = relPathToRoot(new Path(entry.getKey()), parentPath);
|
|
|
+ if (relPath != null) {
|
|
|
+ String toWrite = relPath + " dir ";
|
|
|
+ HashSet<String> children = entry.getValue();
|
|
|
+ StringBuffer sbuff = new StringBuffer();
|
|
|
+ sbuff.append(toWrite);
|
|
|
+ for (String child: children) {
|
|
|
+ sbuff.append(child + " ");
|
|
|
+ }
|
|
|
+ toWrite = sbuff.toString();
|
|
|
+ srcWriter.append(new LongWritable(0L), new Text(toWrite));
|
|
|
}
|
|
|
- toWrite = sbuff.toString();
|
|
|
- srcWriter.append(new LongWritable(0L), new Text(toWrite));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**archive the given source paths into
|
|
|
* the dest
|
|
|
+ * @param parentPath the parent path of all the source paths
|
|
|
* @param srcPaths the src paths to be archived
|
|
|
* @param dest the dest dir that will contain the archive
|
|
|
*/
|
|
|
- public void archive(List<Path> srcPaths, String archiveName, Path dest)
|
|
|
- throws IOException {
|
|
|
+ void archive(Path parentPath, List<Path> srcPaths,
|
|
|
+ String archiveName, Path dest) throws IOException {
|
|
|
checkPaths(conf, srcPaths);
|
|
|
int numFiles = 0;
|
|
|
long totalSize = 0;
|
|
|
+ FileSystem fs = parentPath.getFileSystem(conf);
|
|
|
conf.set(DST_HAR_LABEL, archiveName);
|
|
|
+ conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
|
|
|
Path outputPath = new Path(dest, archiveName);
|
|
|
FileOutputFormat.setOutputPath(conf, outputPath);
|
|
|
FileSystem outFs = outputPath.getFileSystem(conf);
|
|
|
if (outFs.exists(outputPath) || outFs.isFile(dest)) {
|
|
|
- throw new IOException("Invalid Output.");
|
|
|
+ throw new IOException("Invalid Output: " + outputPath);
|
|
|
}
|
|
|
conf.set(DST_DIR_LABEL, outputPath.toString());
|
|
|
final String randomId = DistCp.getRandomId();
|
|
@@ -331,7 +374,7 @@ public class HadoopArchives implements Tool {
|
|
|
// create single list of files and dirs
|
|
|
try {
|
|
|
// write the top level dirs in first
|
|
|
- writeTopLevelDirs(srcWriter, srcPaths);
|
|
|
+ writeTopLevelDirs(srcWriter, srcPaths, parentPath);
|
|
|
srcWriter.sync();
|
|
|
// these are the input paths passed
|
|
|
// from the command line
|
|
@@ -339,14 +382,13 @@ public class HadoopArchives implements Tool {
|
|
|
// and then write them to the input file
|
|
|
// one at a time
|
|
|
for (Path src: srcPaths) {
|
|
|
- FileSystem fs = src.getFileSystem(conf);
|
|
|
ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
|
|
|
recursivels(fs, src, allFiles);
|
|
|
for (FileStatus stat: allFiles) {
|
|
|
String toWrite = "";
|
|
|
long len = stat.isDir()? 0:stat.getLen();
|
|
|
if (stat.isDir()) {
|
|
|
- toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
|
|
|
+ toWrite = "" + relPathToRoot(stat.getPath(), parentPath) + " dir ";
|
|
|
//get the children
|
|
|
FileStatus[] list = fs.listStatus(stat.getPath());
|
|
|
StringBuffer sbuff = new StringBuffer();
|
|
@@ -357,7 +399,7 @@ public class HadoopArchives implements Tool {
|
|
|
toWrite = sbuff.toString();
|
|
|
}
|
|
|
else {
|
|
|
- toWrite += fs.makeQualified(stat.getPath()) + " file ";
|
|
|
+ toWrite += relPathToRoot(stat.getPath(), parentPath) + " file ";
|
|
|
}
|
|
|
srcWriter.append(new LongWritable(len), new
|
|
|
Text(toWrite));
|
|
@@ -403,6 +445,7 @@ public class HadoopArchives implements Tool {
|
|
|
Path tmpOutputDir = null;
|
|
|
Path tmpOutput = null;
|
|
|
String partname = null;
|
|
|
+ Path rootPath = null;
|
|
|
FSDataOutputStream partStream = null;
|
|
|
FileSystem destFs = null;
|
|
|
byte[] buffer;
|
|
@@ -425,6 +468,12 @@ public class HadoopArchives implements Tool {
|
|
|
// directory
|
|
|
partname = "part-" + partId;
|
|
|
tmpOutput = new Path(tmpOutputDir, partname);
|
|
|
+ rootPath = (conf.get(SRC_PARENT_LABEL, null) == null) ? null :
|
|
|
+ new Path(conf.get(SRC_PARENT_LABEL));
|
|
|
+ if (rootPath == null) {
|
|
|
+ throw new RuntimeException("Unable to read parent " +
|
|
|
+ "path for har from config");
|
|
|
+ }
|
|
|
try {
|
|
|
destFs = tmpOutput.getFileSystem(conf);
|
|
|
//this was a stale copy
|
|
@@ -450,16 +499,7 @@ public class HadoopArchives implements Tool {
|
|
|
fsin.close();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // the relative path of p. basically
|
|
|
- // getting rid of schema. Parsing and doing
|
|
|
- // string manipulation is not good - so
|
|
|
- // just use the path api to do it.
|
|
|
- private Path makeRelative(Path p) {
|
|
|
- Path retPath = new Path(p.toUri().getPath());
|
|
|
- return retPath;
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
static class MapStat {
|
|
|
private String pathname;
|
|
|
private boolean isDir;
|
|
@@ -481,6 +521,20 @@ public class HadoopArchives implements Tool {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * get rid of / in the beginning of path
|
|
|
+ * @param p the path
|
|
|
+ * @return return path without /
|
|
|
+ */
|
|
|
+ private Path realPath(Path p, Path parent) {
|
|
|
+ Path rootPath = new Path(Path.SEPARATOR);
|
|
|
+ if (rootPath.compareTo(p) == 0) {
|
|
|
+ return parent;
|
|
|
+ }
|
|
|
+ return new Path(parent, new Path(p.toString().substring(1)));
|
|
|
+ }
|
|
|
+
|
|
|
// read files from the split input
|
|
|
// and write it onto the part files.
|
|
|
// also output hash(name) and string
|
|
@@ -491,10 +545,10 @@ public class HadoopArchives implements Tool {
|
|
|
Reporter reporter) throws IOException {
|
|
|
String line = value.toString();
|
|
|
MapStat mstat = new MapStat(line);
|
|
|
- Path srcPath = new Path(mstat.pathname);
|
|
|
- String towrite = null;
|
|
|
- Path relPath = makeRelative(srcPath);
|
|
|
+ Path relPath = new Path(mstat.pathname);
|
|
|
int hash = HarFileSystem.getHarHash(relPath);
|
|
|
+ String towrite = null;
|
|
|
+ Path srcPath = realPath(relPath, rootPath);
|
|
|
long startPos = partStream.getPos();
|
|
|
if (mstat.isDir) {
|
|
|
towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
|
|
@@ -609,27 +663,26 @@ public class HadoopArchives implements Tool {
|
|
|
outStream.close();
|
|
|
indexStream.close();
|
|
|
// try increasing the replication
|
|
|
- fs.setReplication(index, (short) 10);
|
|
|
- fs.setReplication(masterIndex, (short) 10);
|
|
|
+ fs.setReplication(index, (short) 5);
|
|
|
+ fs.setReplication(masterIndex, (short) 5);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
/** the main driver for creating the archives
|
|
|
- * it takes at least two command line parameters. The src and the
|
|
|
- * dest. It does an lsr on the source paths.
|
|
|
+ * it takes at least three command line parameters. The parent path,
|
|
|
+ * The src and the dest. It does an lsr on the source paths.
|
|
|
* The mapper created archuves and the reducer creates
|
|
|
* the archive index.
|
|
|
*/
|
|
|
|
|
|
public int run(String[] args) throws Exception {
|
|
|
try {
|
|
|
+ Path parentPath = null;
|
|
|
List<Path> srcPaths = new ArrayList<Path>();
|
|
|
Path destPath = null;
|
|
|
- // check we were supposed to archive or
|
|
|
- // unarchive
|
|
|
String archiveName = null;
|
|
|
- if (args.length < 4) {
|
|
|
+ if (args.length < 5) {
|
|
|
System.out.println(usage);
|
|
|
throw new IOException("Invalid usage.");
|
|
|
}
|
|
@@ -642,17 +695,34 @@ public class HadoopArchives implements Tool {
|
|
|
System.out.println(usage);
|
|
|
throw new IOException("Invalid name for archives. " + archiveName);
|
|
|
}
|
|
|
- for (int i = 2; i < args.length; i++) {
|
|
|
+ int i = 2;
|
|
|
+ //check to see if relative parent has been provided or not
|
|
|
+ //this is a required parameter.
|
|
|
+ if (! "-p".equals(args[i])) {
|
|
|
+ System.out.println(usage);
|
|
|
+ throw new IOException("Parent path not specified.");
|
|
|
+ }
|
|
|
+ parentPath = new Path(args[i+1]);
|
|
|
+ i+=2;
|
|
|
+ //read the rest of the paths
|
|
|
+ for (; i < args.length; i++) {
|
|
|
if (i == (args.length - 1)) {
|
|
|
destPath = new Path(args[i]);
|
|
|
}
|
|
|
else {
|
|
|
- srcPaths.add(new Path(args[i]));
|
|
|
+ Path argPath = new Path(args[i]);
|
|
|
+ if (argPath.isAbsolute()) {
|
|
|
+ System.out.println(usage);
|
|
|
+ throw new IOException("source path " + argPath +
|
|
|
+ " is not relative to "+ parentPath);
|
|
|
+ }
|
|
|
+ srcPaths.add(new Path(parentPath, argPath));
|
|
|
}
|
|
|
}
|
|
|
if (srcPaths.size() == 0) {
|
|
|
- System.out.println(usage);
|
|
|
- throw new IOException("Invalid Usage: No input sources specified.");
|
|
|
+ // assuming if the user does not specify path for sources
|
|
|
+ // the whole parent directory needs to be archived.
|
|
|
+ srcPaths.add(parentPath);
|
|
|
}
|
|
|
// do a glob on the srcPaths and then pass it on
|
|
|
List<Path> globPaths = new ArrayList<Path>();
|
|
@@ -663,7 +733,7 @@ public class HadoopArchives implements Tool {
|
|
|
globPaths.add(fs.makeQualified(status.getPath()));
|
|
|
}
|
|
|
}
|
|
|
- archive(globPaths, archiveName, destPath);
|
|
|
+ archive(parentPath, globPaths, archiveName, destPath);
|
|
|
} catch(IOException ie) {
|
|
|
System.err.println(ie.getLocalizedMessage());
|
|
|
return -1;
|