|
@@ -18,14 +18,18 @@
|
|
|
|
|
|
package org.apache.hadoop.tools;
|
|
|
|
|
|
+import java.io.DataInput;
|
|
|
+import java.io.DataOutput;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
-
|
|
|
+import java.io.UnsupportedEncodingException;
|
|
|
+import java.net.URLEncoder;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
@@ -38,10 +42,12 @@ import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.HarFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.mapred.FileInputFormat;
|
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
|
import org.apache.hadoop.mapred.FileSplit;
|
|
@@ -53,9 +59,11 @@ import org.apache.hadoop.mapred.Mapper;
|
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
import org.apache.hadoop.mapred.RecordReader;
|
|
|
import org.apache.hadoop.mapred.Reducer;
|
|
|
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
|
|
|
import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
|
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
@@ -67,6 +75,7 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
* Hadoop archives look at {@link HarFileSystem}.
|
|
|
*/
|
|
|
public class HadoopArchives implements Tool {
|
|
|
+ public static final int VERSION = 3;
|
|
|
private static final Log LOG = LogFactory.getLog(HadoopArchives.class);
|
|
|
|
|
|
private static final String NAME = "har";
|
|
@@ -77,12 +86,19 @@ public class HadoopArchives implements Tool {
|
|
|
static final String SRC_COUNT_LABEL = NAME + ".src.count";
|
|
|
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
|
|
|
static final String DST_HAR_LABEL = NAME + ".archive.name";
|
|
|
- // size of each part file
|
|
|
- // its fixed for now.
|
|
|
- static final long partSize = 2 * 1024 * 1024 * 1024l;
|
|
|
+ static final String SRC_PARENT_LABEL = NAME + ".parent.path";
|
|
|
+ /** the size of the blocks that will be created when archiving **/
|
|
|
+ static final String HAR_BLOCKSIZE_LABEL = NAME + ".block.size";
|
|
|
+ /**the size of the part files that will be created when archiving **/
|
|
|
+ static final String HAR_PARTSIZE_LABEL = NAME + ".partfile.size";
|
|
|
+
|
|
|
+ /** size of each part file size **/
|
|
|
+ long partSize = 2 * 1024 * 1024 * 1024l;
|
|
|
+ /** size of blocks in hadoop archives **/
|
|
|
+ long blockSize = 512 * 1024 * 1024l;
|
|
|
|
|
|
private static final String usage = "archive"
|
|
|
- + " -archiveName NAME <src>* <dest>" +
|
|
|
+ + " -archiveName NAME -p <parent path> <src>* <dest>" +
|
|
|
"\n";
|
|
|
|
|
|
|
|
@@ -118,22 +134,68 @@ public class HadoopArchives implements Tool {
|
|
|
/**
|
|
|
* this assumes that there are two types of files file/dir
|
|
|
* @param fs the input filesystem
|
|
|
- * @param p the top level path
|
|
|
+ * @param fdir the filestatusdir of the path
|
|
|
* @param out the list of paths output of recursive ls
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void recursivels(FileSystem fs, Path p, List<FileStatus> out)
|
|
|
+ private void recursivels(FileSystem fs, FileStatusDir fdir, List<FileStatusDir> out)
|
|
|
throws IOException {
|
|
|
- FileStatus fstatus = fs.getFileStatus(p);
|
|
|
- if (!fstatus.isDir()) {
|
|
|
- out.add(fstatus);
|
|
|
+ if (!fdir.getFileStatus().isDir()) {
|
|
|
+ out.add(fdir);
|
|
|
return;
|
|
|
}
|
|
|
else {
|
|
|
- out.add(fstatus);
|
|
|
- FileStatus[] listStatus = fs.listStatus(p);
|
|
|
+ out.add(fdir);
|
|
|
+ FileStatus[] listStatus = fs.listStatus(fdir.getFileStatus().getPath());
|
|
|
+ fdir.setChildren(listStatus);
|
|
|
for (FileStatus stat: listStatus) {
|
|
|
- recursivels(fs, stat.getPath(), out);
|
|
|
+ FileStatusDir fstatDir = new FileStatusDir(stat, null);
|
|
|
+ recursivels(fs, fstatDir, out);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** HarEntry is used in the {@link HArchivesMapper} as the input value. */
|
|
|
+ private static class HarEntry implements Writable {
|
|
|
+ String path;
|
|
|
+ String[] children;
|
|
|
+
|
|
|
+ HarEntry() {}
|
|
|
+
|
|
|
+ HarEntry(String path, String[] children) {
|
|
|
+ this.path = path;
|
|
|
+ this.children = children;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isDir() {
|
|
|
+ return children != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFields(DataInput in) throws IOException {
|
|
|
+ path = Text.readString(in);
|
|
|
+
|
|
|
+ if (in.readBoolean()) {
|
|
|
+ children = new String[in.readInt()];
|
|
|
+ for(int i = 0; i < children.length; i++) {
|
|
|
+ children[i] = Text.readString(in);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ children = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(DataOutput out) throws IOException {
|
|
|
+ Text.writeString(out, path);
|
|
|
+
|
|
|
+ final boolean dir = isDir();
|
|
|
+ out.writeBoolean(dir);
|
|
|
+ if (dir) {
|
|
|
+ out.writeInt(children.length);
|
|
|
+ for(String c : children) {
|
|
|
+ Text.writeString(out, c);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -142,8 +204,7 @@ public class HadoopArchives implements Tool {
|
|
|
* Input format of a hadoop archive job responsible for
|
|
|
* generating splits of the file list
|
|
|
*/
|
|
|
-
|
|
|
- static class HArchiveInputFormat implements InputFormat<LongWritable, Text> {
|
|
|
+ static class HArchiveInputFormat implements InputFormat<LongWritable, HarEntry> {
|
|
|
|
|
|
//generate input splits from the src file lists
|
|
|
public InputSplit[] getSplits(JobConf jconf, int numSplits)
|
|
@@ -163,7 +224,7 @@ public class HadoopArchives implements Tool {
|
|
|
FileStatus fstatus = fs.getFileStatus(src);
|
|
|
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
|
|
|
LongWritable key = new LongWritable();
|
|
|
- Text value = new Text();
|
|
|
+ final HarEntry value = new HarEntry();
|
|
|
SequenceFile.Reader reader = null;
|
|
|
// the remaining bytes in the file split
|
|
|
long remaining = fstatus.getLen();
|
|
@@ -200,9 +261,10 @@ public class HadoopArchives implements Tool {
|
|
|
return splits.toArray(new FileSplit[splits.size()]);
|
|
|
}
|
|
|
|
|
|
- public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
|
|
|
+ @Override
|
|
|
+ public RecordReader<LongWritable, HarEntry> getRecordReader(InputSplit split,
|
|
|
JobConf job, Reporter reporter) throws IOException {
|
|
|
- return new SequenceFileRecordReader<LongWritable, Text>(job,
|
|
|
+ return new SequenceFileRecordReader<LongWritable, HarEntry>(job,
|
|
|
(FileSplit)split);
|
|
|
}
|
|
|
}
|
|
@@ -228,24 +290,53 @@ public class HadoopArchives implements Tool {
|
|
|
return deepest;
|
|
|
}
|
|
|
|
|
|
- // this method is tricky. This method writes
|
|
|
- // the top level directories in such a way so that
|
|
|
- // the output only contains valid directoreis in archives.
|
|
|
- // so for an input path specified by the user
|
|
|
- // as /user/hadoop
|
|
|
- // we need to index
|
|
|
- // / as the root
|
|
|
- // /user as a directory
|
|
|
- // /user/hadoop as a directory
|
|
|
- // so for multiple input paths it makes sure that it
|
|
|
- // does the right thing.
|
|
|
- // so if the user specifies the input directories as
|
|
|
- // /user/harry and /user/hadoop
|
|
|
- // we need to write / and user as its child
|
|
|
- // and /user and harry and hadoop as its children
|
|
|
+ /**
|
|
|
+ * truncate the prefix root from the full path
|
|
|
+ * @param fullPath the full path
|
|
|
+ * @param root the prefix root to be truncated
|
|
|
+ * @return the relative path
|
|
|
+ */
|
|
|
+ private Path relPathToRoot(Path fullPath, Path root) {
|
|
|
+ // just take some effort to do it
|
|
|
+ // rather than just using substring
|
|
|
+ // so that we do not break sometime later
|
|
|
+ final Path justRoot = new Path(Path.SEPARATOR);
|
|
|
+ if (fullPath.depth() == root.depth()) {
|
|
|
+ return justRoot;
|
|
|
+ }
|
|
|
+ else if (fullPath.depth() > root.depth()) {
|
|
|
+ Path retPath = new Path(fullPath.getName());
|
|
|
+ Path parent = fullPath.getParent();
|
|
|
+ for (int i=0; i < (fullPath.depth() - root.depth() -1); i++) {
|
|
|
+ retPath = new Path(parent.getName(), retPath);
|
|
|
+ parent = parent.getParent();
|
|
|
+ }
|
|
|
+ return new Path(justRoot, retPath);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * this method writes all the valid top level directories
|
|
|
+ * into the srcWriter for indexing. This method is a little
|
|
|
+ * tricky. example-
|
|
|
+ * for an input with parent path /home/user/ and sources
|
|
|
+ * as /home/user/source/dir1, /home/user/source/dir2 - this
|
|
|
+ * will output <source, dir, dir1, dir2> (dir means that source is a dir
|
|
|
+ * with dir1 and dir2 as children) and <source/dir1, file, null>
|
|
|
+ * and <source/dir2, file, null>
|
|
|
+ * @param srcWriter the sequence file writer to write the
|
|
|
+ * directories to
|
|
|
+ * @param paths the source paths provided by the user. They
|
|
|
+ * are glob free and have full path (not relative paths)
|
|
|
+ * @param parentPath the parent path that you wnat the archives
|
|
|
+ * to be relative to. example - /home/user/dir1 can be archived with
|
|
|
+ * parent as /home or /home/user.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
private void writeTopLevelDirs(SequenceFile.Writer srcWriter,
|
|
|
- List<Path> paths) throws IOException {
|
|
|
- //these are qualified paths
|
|
|
+ List<Path> paths, Path parentPath) throws IOException {
|
|
|
+ //add all the directories
|
|
|
List<Path> justDirs = new ArrayList<Path>();
|
|
|
for (Path p: paths) {
|
|
|
if (!p.getFileSystem(getConf()).isFile(p)) {
|
|
@@ -255,17 +346,23 @@ public class HadoopArchives implements Tool {
|
|
|
justDirs.add(new Path(p.getParent().toUri().getPath()));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- //get the largest depth path
|
|
|
- // this is tricky
|
|
|
- TreeMap<String, HashSet<String>> allpaths = new TreeMap<String, HashSet<String>>();
|
|
|
+ /* find all the common parents of paths that are valid archive
|
|
|
+ * paths. The below is done so that we do not add a common path
|
|
|
+ * twice and also we need to only add valid child of a path that
|
|
|
+ * are specified the user.
|
|
|
+ */
|
|
|
+ TreeMap<String, HashSet<String>> allpaths = new TreeMap<String,
|
|
|
+ HashSet<String>>();
|
|
|
+ /* the largest depth of paths. the max number of times
|
|
|
+ * we need to iterate
|
|
|
+ */
|
|
|
Path deepest = largestDepth(paths);
|
|
|
Path root = new Path(Path.SEPARATOR);
|
|
|
- for (int i = 0; i < deepest.depth(); i++) {
|
|
|
+ for (int i = parentPath.depth(); i < deepest.depth(); i++) {
|
|
|
List<Path> parents = new ArrayList<Path>();
|
|
|
for (Path p: justDirs) {
|
|
|
if (p.compareTo(root) == 0){
|
|
|
- //don nothing
|
|
|
+ //do nothing
|
|
|
}
|
|
|
else {
|
|
|
Path parent = p.getParent();
|
|
@@ -287,53 +384,118 @@ public class HadoopArchives implements Tool {
|
|
|
}
|
|
|
Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
|
|
|
for (Map.Entry<String, HashSet<String>> entry : keyVals) {
|
|
|
- HashSet<String> children = entry.getValue();
|
|
|
- String toWrite = entry.getKey() + " dir ";
|
|
|
- StringBuffer sbuff = new StringBuffer();
|
|
|
- sbuff.append(toWrite);
|
|
|
- for (String child: children) {
|
|
|
- sbuff.append(child + " ");
|
|
|
+ final Path relPath = relPathToRoot(new Path(entry.getKey()), parentPath);
|
|
|
+ if (relPath != null) {
|
|
|
+ final String[] children = new String[entry.getValue().size()];
|
|
|
+ int i = 0;
|
|
|
+ for(String child: entry.getValue()) {
|
|
|
+ children[i++] = child;
|
|
|
+ }
|
|
|
+ append(srcWriter, 0L, relPath.toString(), children);
|
|
|
}
|
|
|
- toWrite = sbuff.toString();
|
|
|
- srcWriter.append(new LongWritable(0L), new Text(toWrite));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void append(SequenceFile.Writer srcWriter, long len,
|
|
|
+ String path, String[] children) throws IOException {
|
|
|
+ srcWriter.append(new LongWritable(len), new HarEntry(path, children));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A static class that keeps
|
|
|
+ * track of status of a path
|
|
|
+ * and there children if path is a dir
|
|
|
+ */
|
|
|
+ static class FileStatusDir {
|
|
|
+ private FileStatus fstatus;
|
|
|
+ private FileStatus[] children = null;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * constructor for filestatusdir
|
|
|
+ * @param fstatus the filestatus object that maps to filestatusdir
|
|
|
+ * @param children the children list if fs is a directory
|
|
|
+ */
|
|
|
+ FileStatusDir(FileStatus fstatus, FileStatus[] children) {
|
|
|
+ this.fstatus = fstatus;
|
|
|
+ this.children = children;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * set children of this object
|
|
|
+ * @param listStatus the list of children
|
|
|
+ */
|
|
|
+ public void setChildren(FileStatus[] listStatus) {
|
|
|
+ this.children = listStatus;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * the filestatus of this object
|
|
|
+ * @return the filestatus of this object
|
|
|
+ */
|
|
|
+ FileStatus getFileStatus() {
|
|
|
+ return this.fstatus;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * the children list of this object, null if
|
|
|
+ * @return the children list
|
|
|
+ */
|
|
|
+ FileStatus[] getChildren() {
|
|
|
+ return this.children;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**archive the given source paths into
|
|
|
* the dest
|
|
|
+ * @param parentPath the parent path of all the source paths
|
|
|
* @param srcPaths the src paths to be archived
|
|
|
* @param dest the dest dir that will contain the archive
|
|
|
*/
|
|
|
- public void archive(List<Path> srcPaths, String archiveName, Path dest)
|
|
|
- throws IOException {
|
|
|
+ void archive(Path parentPath, List<Path> srcPaths,
|
|
|
+ String archiveName, Path dest) throws IOException {
|
|
|
checkPaths(conf, srcPaths);
|
|
|
int numFiles = 0;
|
|
|
long totalSize = 0;
|
|
|
+ FileSystem fs = parentPath.getFileSystem(conf);
|
|
|
+ this.blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
|
|
|
+ this.partSize = conf.getLong(HAR_PARTSIZE_LABEL, partSize);
|
|
|
+ conf.setLong(HAR_BLOCKSIZE_LABEL, blockSize);
|
|
|
+ conf.setLong(HAR_PARTSIZE_LABEL, partSize);
|
|
|
conf.set(DST_HAR_LABEL, archiveName);
|
|
|
+ conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
|
|
|
Path outputPath = new Path(dest, archiveName);
|
|
|
FileOutputFormat.setOutputPath(conf, outputPath);
|
|
|
FileSystem outFs = outputPath.getFileSystem(conf);
|
|
|
if (outFs.exists(outputPath) || outFs.isFile(dest)) {
|
|
|
- throw new IOException("Invalid Output.");
|
|
|
+ throw new IOException("Invalid Output: " + outputPath);
|
|
|
}
|
|
|
conf.set(DST_DIR_LABEL, outputPath.toString());
|
|
|
- final String randomId = DistCp.getRandomId();
|
|
|
- Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
|
|
|
- NAME + "_" + randomId);
|
|
|
+ JobClient jClient = new JobClient(conf);
|
|
|
+ Path stagingArea;
|
|
|
+ try {
|
|
|
+ stagingArea = JobSubmissionFiles.getStagingDir(jClient, conf);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new IOException(ie);
|
|
|
+ }
|
|
|
+ Path jobDirectory = new Path(stagingArea,
|
|
|
+ NAME+"_"+Integer.toString(new Random().nextInt(Integer.MAX_VALUE), 36));
|
|
|
+ FsPermission mapredSysPerms =
|
|
|
+ new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
|
|
|
+ FileSystem.mkdirs(jobDirectory.getFileSystem(conf), jobDirectory,
|
|
|
+ mapredSysPerms);
|
|
|
conf.set(JOB_DIR_LABEL, jobDirectory.toString());
|
|
|
//get a tmp directory for input splits
|
|
|
FileSystem jobfs = jobDirectory.getFileSystem(conf);
|
|
|
- jobfs.mkdirs(jobDirectory);
|
|
|
Path srcFiles = new Path(jobDirectory, "_har_src_files");
|
|
|
conf.set(SRC_LIST_LABEL, srcFiles.toString());
|
|
|
SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
|
|
|
- srcFiles, LongWritable.class, Text.class,
|
|
|
+ srcFiles, LongWritable.class, HarEntry.class,
|
|
|
SequenceFile.CompressionType.NONE);
|
|
|
// get the list of files
|
|
|
// create single list of files and dirs
|
|
|
try {
|
|
|
// write the top level dirs in first
|
|
|
- writeTopLevelDirs(srcWriter, srcPaths);
|
|
|
+ writeTopLevelDirs(srcWriter, srcPaths, parentPath);
|
|
|
srcWriter.sync();
|
|
|
// these are the input paths passed
|
|
|
// from the command line
|
|
@@ -341,28 +503,27 @@ public class HadoopArchives implements Tool {
|
|
|
// and then write them to the input file
|
|
|
// one at a time
|
|
|
for (Path src: srcPaths) {
|
|
|
- FileSystem fs = src.getFileSystem(conf);
|
|
|
- ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
|
|
|
- recursivels(fs, src, allFiles);
|
|
|
- for (FileStatus stat: allFiles) {
|
|
|
- String toWrite = "";
|
|
|
+ ArrayList<FileStatusDir> allFiles = new ArrayList<FileStatusDir>();
|
|
|
+ FileStatus fstatus = fs.getFileStatus(src);
|
|
|
+ FileStatusDir fdir = new FileStatusDir(fstatus, null);
|
|
|
+ recursivels(fs, fdir, allFiles);
|
|
|
+ for (FileStatusDir statDir: allFiles) {
|
|
|
+ FileStatus stat = statDir.getFileStatus();
|
|
|
long len = stat.isDir()? 0:stat.getLen();
|
|
|
+ final Path path = relPathToRoot(stat.getPath(), parentPath);
|
|
|
+ final String[] children;
|
|
|
if (stat.isDir()) {
|
|
|
- toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
|
|
|
//get the children
|
|
|
- FileStatus[] list = fs.listStatus(stat.getPath());
|
|
|
- StringBuffer sbuff = new StringBuffer();
|
|
|
- sbuff.append(toWrite);
|
|
|
- for (FileStatus stats: list) {
|
|
|
- sbuff.append(stats.getPath().getName() + " ");
|
|
|
+ FileStatus[] list = statDir.getChildren();
|
|
|
+ children = new String[list.length];
|
|
|
+ for (int i = 0; i < list.length; i++) {
|
|
|
+ children[i] = list[i].getPath().getName();
|
|
|
}
|
|
|
- toWrite = sbuff.toString();
|
|
|
}
|
|
|
else {
|
|
|
- toWrite += fs.makeQualified(stat.getPath()) + " file ";
|
|
|
+ children = null;
|
|
|
}
|
|
|
- srcWriter.append(new LongWritable(len), new
|
|
|
- Text(toWrite));
|
|
|
+ append(srcWriter, len, path.toString(), children);
|
|
|
srcWriter.sync();
|
|
|
numFiles++;
|
|
|
totalSize += len;
|
|
@@ -399,23 +560,26 @@ public class HadoopArchives implements Tool {
|
|
|
}
|
|
|
|
|
|
static class HArchivesMapper
|
|
|
- implements Mapper<LongWritable, Text, IntWritable, Text> {
|
|
|
+ implements Mapper<LongWritable, HarEntry, IntWritable, Text> {
|
|
|
private JobConf conf = null;
|
|
|
int partId = -1 ;
|
|
|
Path tmpOutputDir = null;
|
|
|
Path tmpOutput = null;
|
|
|
String partname = null;
|
|
|
+ Path rootPath = null;
|
|
|
FSDataOutputStream partStream = null;
|
|
|
FileSystem destFs = null;
|
|
|
byte[] buffer;
|
|
|
int buf_size = 128 * 1024;
|
|
|
-
|
|
|
+ long blockSize = 512 * 1024 * 1024l;
|
|
|
+
|
|
|
// configure the mapper and create
|
|
|
// the part file.
|
|
|
// use map reduce framework to write into
|
|
|
// tmp files.
|
|
|
public void configure(JobConf conf) {
|
|
|
this.conf = conf;
|
|
|
+
|
|
|
// this is tightly tied to map reduce
|
|
|
// since it does not expose an api
|
|
|
// to get the partition
|
|
@@ -423,19 +587,27 @@ public class HadoopArchives implements Tool {
|
|
|
// create a file name using the partition
|
|
|
// we need to write to this directory
|
|
|
tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
|
|
|
+ blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
|
|
|
// get the output path and write to the tmp
|
|
|
// directory
|
|
|
partname = "part-" + partId;
|
|
|
tmpOutput = new Path(tmpOutputDir, partname);
|
|
|
+ rootPath = (conf.get(SRC_PARENT_LABEL, null) == null) ? null :
|
|
|
+ new Path(conf.get(SRC_PARENT_LABEL));
|
|
|
+ if (rootPath == null) {
|
|
|
+ throw new RuntimeException("Unable to read parent " +
|
|
|
+ "path for har from config");
|
|
|
+ }
|
|
|
try {
|
|
|
destFs = tmpOutput.getFileSystem(conf);
|
|
|
//this was a stale copy
|
|
|
if (destFs.exists(tmpOutput)) {
|
|
|
destFs.delete(tmpOutput, false);
|
|
|
- }
|
|
|
- partStream = destFs.create(tmpOutput);
|
|
|
+ }
|
|
|
+ partStream = destFs.create(tmpOutput, false, conf.getInt("io.file.buffer.size", 4096),
|
|
|
+ destFs.getDefaultReplication(), blockSize);
|
|
|
} catch(IOException ie) {
|
|
|
- throw new RuntimeException("Unable to open output file " + tmpOutput);
|
|
|
+ throw new RuntimeException("Unable to open output file " + tmpOutput, ie);
|
|
|
}
|
|
|
buffer = new byte[buf_size];
|
|
|
}
|
|
@@ -453,71 +625,70 @@ public class HadoopArchives implements Tool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // the relative path of p. basically
|
|
|
- // getting rid of schema. Parsing and doing
|
|
|
- // string manipulation is not good - so
|
|
|
- // just use the path api to do it.
|
|
|
- private Path makeRelative(Path p) {
|
|
|
- Path retPath = new Path(p.toUri().getPath());
|
|
|
- return retPath;
|
|
|
- }
|
|
|
-
|
|
|
- static class MapStat {
|
|
|
- private String pathname;
|
|
|
- private boolean isDir;
|
|
|
- private List<String> children;
|
|
|
- public MapStat(String line) {
|
|
|
- String[] splits = line.split(" ");
|
|
|
- pathname = splits[0];
|
|
|
- if ("dir".equals(splits[1])) {
|
|
|
- isDir = true;
|
|
|
- }
|
|
|
- else {
|
|
|
- isDir = false;
|
|
|
- }
|
|
|
- if (isDir) {
|
|
|
- children = new ArrayList<String>();
|
|
|
- for (int i = 2; i < splits.length; i++) {
|
|
|
- children.add(splits[i]);
|
|
|
- }
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * get rid of / in the beginning of path
|
|
|
+ * @param p the path
|
|
|
+ * @return return path without /
|
|
|
+ */
|
|
|
+ private Path realPath(Path p, Path parent) {
|
|
|
+ Path rootPath = new Path(Path.SEPARATOR);
|
|
|
+ if (rootPath.compareTo(p) == 0) {
|
|
|
+ return parent;
|
|
|
}
|
|
|
+ return new Path(parent, new Path(p.toString().substring(1)));
|
|
|
}
|
|
|
+
|
|
|
+ private static String encodeName(String s)
|
|
|
+ throws UnsupportedEncodingException {
|
|
|
+ return URLEncoder.encode(s,"UTF-8");
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String encodeProperties( FileStatus fStatus )
|
|
|
+ throws UnsupportedEncodingException {
|
|
|
+ String propStr = encodeName(
|
|
|
+ fStatus.getModificationTime() + " "
|
|
|
+ + fStatus.getPermission().toShort() + " "
|
|
|
+ + encodeName(fStatus.getOwner()) + " "
|
|
|
+ + encodeName(fStatus.getGroup()));
|
|
|
+ return propStr;
|
|
|
+ }
|
|
|
+
|
|
|
// read files from the split input
|
|
|
// and write it onto the part files.
|
|
|
// also output hash(name) and string
|
|
|
// for reducer to create index
|
|
|
// and masterindex files.
|
|
|
- public void map(LongWritable key, Text value,
|
|
|
+ public void map(LongWritable key, HarEntry value,
|
|
|
OutputCollector<IntWritable, Text> out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
- String line = value.toString();
|
|
|
- MapStat mstat = new MapStat(line);
|
|
|
- Path srcPath = new Path(mstat.pathname);
|
|
|
- String towrite = null;
|
|
|
- Path relPath = makeRelative(srcPath);
|
|
|
+ Path relPath = new Path(value.path);
|
|
|
int hash = HarFileSystem.getHarHash(relPath);
|
|
|
+ String towrite = null;
|
|
|
+ Path srcPath = realPath(relPath, rootPath);
|
|
|
long startPos = partStream.getPos();
|
|
|
- if (mstat.isDir) {
|
|
|
- towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
|
|
|
+ FileSystem srcFs = srcPath.getFileSystem(conf);
|
|
|
+ FileStatus srcStatus = srcFs.getFileStatus(srcPath);
|
|
|
+ String propStr = encodeProperties(srcStatus);
|
|
|
+ if (value.isDir()) {
|
|
|
+ towrite = encodeName(relPath.toString())
|
|
|
+ + " dir " + propStr + " 0 0 ";
|
|
|
StringBuffer sbuff = new StringBuffer();
|
|
|
sbuff.append(towrite);
|
|
|
- for (String child: mstat.children) {
|
|
|
- sbuff.append(child + " ");
|
|
|
+ for (String child: value.children) {
|
|
|
+ sbuff.append(encodeName(child) + " ");
|
|
|
}
|
|
|
towrite = sbuff.toString();
|
|
|
//reading directories is also progress
|
|
|
reporter.progress();
|
|
|
}
|
|
|
else {
|
|
|
- FileSystem srcFs = srcPath.getFileSystem(conf);
|
|
|
- FileStatus srcStatus = srcFs.getFileStatus(srcPath);
|
|
|
FSDataInputStream input = srcFs.open(srcStatus.getPath());
|
|
|
reporter.setStatus("Copying file " + srcStatus.getPath() +
|
|
|
" to archive.");
|
|
|
copyData(srcStatus.getPath(), input, partStream, reporter);
|
|
|
- towrite = relPath.toString() + " file " + partname + " " + startPos
|
|
|
- + " " + srcStatus.getLen() + " ";
|
|
|
+ towrite = encodeName(relPath.toString())
|
|
|
+ + " file " + partname + " " + startPos
|
|
|
+ + " " + srcStatus.getLen() + " " + propStr + " ";
|
|
|
}
|
|
|
out.collect(new IntWritable(hash), new Text(towrite));
|
|
|
}
|
|
@@ -563,7 +734,7 @@ public class HadoopArchives implements Tool {
|
|
|
}
|
|
|
indexStream = fs.create(index);
|
|
|
outStream = fs.create(masterIndex);
|
|
|
- String version = HarFileSystem.VERSION + " \n";
|
|
|
+ String version = VERSION + " \n";
|
|
|
outStream.write(version.getBytes());
|
|
|
|
|
|
} catch(IOException e) {
|
|
@@ -611,27 +782,26 @@ public class HadoopArchives implements Tool {
|
|
|
outStream.close();
|
|
|
indexStream.close();
|
|
|
// try increasing the replication
|
|
|
- fs.setReplication(index, (short) 10);
|
|
|
- fs.setReplication(masterIndex, (short) 10);
|
|
|
+ fs.setReplication(index, (short) 5);
|
|
|
+ fs.setReplication(masterIndex, (short) 5);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
/** the main driver for creating the archives
|
|
|
- * it takes at least two command line parameters. The src and the
|
|
|
- * dest. It does an lsr on the source paths.
|
|
|
+ * it takes at least three command line parameters. The parent path,
|
|
|
+ * The src and the dest. It does an lsr on the source paths.
|
|
|
* The mapper created archuves and the reducer creates
|
|
|
* the archive index.
|
|
|
*/
|
|
|
|
|
|
public int run(String[] args) throws Exception {
|
|
|
try {
|
|
|
+ Path parentPath = null;
|
|
|
List<Path> srcPaths = new ArrayList<Path>();
|
|
|
Path destPath = null;
|
|
|
- // check we were supposed to archive or
|
|
|
- // unarchive
|
|
|
String archiveName = null;
|
|
|
- if (args.length < 4) {
|
|
|
+ if (args.length < 5) {
|
|
|
System.out.println(usage);
|
|
|
throw new IOException("Invalid usage.");
|
|
|
}
|
|
@@ -644,28 +814,52 @@ public class HadoopArchives implements Tool {
|
|
|
System.out.println(usage);
|
|
|
throw new IOException("Invalid name for archives. " + archiveName);
|
|
|
}
|
|
|
- for (int i = 2; i < args.length; i++) {
|
|
|
+ int i = 2;
|
|
|
+ //check to see if relative parent has been provided or not
|
|
|
+ //this is a required parameter.
|
|
|
+ if (! "-p".equals(args[i])) {
|
|
|
+ System.out.println(usage);
|
|
|
+ throw new IOException("Parent path not specified.");
|
|
|
+ }
|
|
|
+ parentPath = new Path(args[i+1]);
|
|
|
+ i+=2;
|
|
|
+ //read the rest of the paths
|
|
|
+ for (; i < args.length; i++) {
|
|
|
if (i == (args.length - 1)) {
|
|
|
destPath = new Path(args[i]);
|
|
|
}
|
|
|
else {
|
|
|
- srcPaths.add(new Path(args[i]));
|
|
|
+ Path argPath = new Path(args[i]);
|
|
|
+ if (argPath.isAbsolute()) {
|
|
|
+ System.out.println(usage);
|
|
|
+ throw new IOException("source path " + argPath +
|
|
|
+ " is not relative to "+ parentPath);
|
|
|
+ }
|
|
|
+ srcPaths.add(new Path(parentPath, argPath));
|
|
|
}
|
|
|
}
|
|
|
if (srcPaths.size() == 0) {
|
|
|
- System.out.println(usage);
|
|
|
- throw new IOException("Invalid Usage: No input sources specified.");
|
|
|
+ // assuming if the user does not specify path for sources
|
|
|
+ // the whole parent directory needs to be archived.
|
|
|
+ srcPaths.add(parentPath);
|
|
|
}
|
|
|
// do a glob on the srcPaths and then pass it on
|
|
|
List<Path> globPaths = new ArrayList<Path>();
|
|
|
for (Path p: srcPaths) {
|
|
|
FileSystem fs = p.getFileSystem(getConf());
|
|
|
FileStatus[] statuses = fs.globStatus(p);
|
|
|
- for (FileStatus status: statuses) {
|
|
|
- globPaths.add(fs.makeQualified(status.getPath()));
|
|
|
+ if (statuses != null) {
|
|
|
+ for (FileStatus status: statuses) {
|
|
|
+ globPaths.add(fs.makeQualified(status.getPath()));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- archive(globPaths, archiveName, destPath);
|
|
|
+ if (globPaths.isEmpty()) {
|
|
|
+ throw new IOException("The resolved paths set is empty."
|
|
|
+ + " Please check whether the srcPaths exist, where srcPaths = "
|
|
|
+ + srcPaths);
|
|
|
+ }
|
|
|
+ archive(parentPath, globPaths, archiveName, destPath);
|
|
|
} catch(IOException ie) {
|
|
|
System.err.println(ie.getLocalizedMessage());
|
|
|
return -1;
|
|
@@ -683,8 +877,13 @@ public class HadoopArchives implements Tool {
|
|
|
ret = ToolRunner.run(harchives, args);
|
|
|
} catch(Exception e) {
|
|
|
LOG.debug("Exception in archives ", e);
|
|
|
- System.err.println("Exception in archives");
|
|
|
- System.err.println(e.getLocalizedMessage());
|
|
|
+ System.err.println(e.getClass().getSimpleName() + " in archives");
|
|
|
+ final String s = e.getLocalizedMessage();
|
|
|
+ if (s != null) {
|
|
|
+ System.err.println(s);
|
|
|
+ } else {
|
|
|
+ e.printStackTrace(System.err);
|
|
|
+ }
|
|
|
System.exit(1);
|
|
|
}
|
|
|
System.exit(ret);
|