|
@@ -0,0 +1,511 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.tools;
|
|
|
+
|
|
|
+import java.io.DataInput;
|
|
|
+import java.io.DataOutput;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Stack;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
+import org.apache.hadoop.io.WritableComparable;
|
|
|
+import org.apache.hadoop.mapred.FileOutputFormat;
|
|
|
+import org.apache.hadoop.mapred.FileSplit;
|
|
|
+import org.apache.hadoop.mapred.InputFormat;
|
|
|
+import org.apache.hadoop.mapred.InputSplit;
|
|
|
+import org.apache.hadoop.mapred.InvalidInputException;
|
|
|
+import org.apache.hadoop.mapred.JobClient;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.Mapper;
|
|
|
+import org.apache.hadoop.mapred.OutputCollector;
|
|
|
+import org.apache.hadoop.mapred.RecordReader;
|
|
|
+import org.apache.hadoop.mapred.Reporter;
|
|
|
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.apache.hadoop.util.ToolRunner;
|
|
|
+
|
|
|
+/**
|
|
|
+ * A Map-reduce program to recursively change files properties
|
|
|
+ * such as owner, group and permission.
|
|
|
+ */
|
|
|
+public class DistCh extends DistTool {
|
|
|
+ static final String NAME = "distch";
|
|
|
+ static final String JOB_DIR_LABEL = NAME + ".job.dir";
|
|
|
+ static final String OP_LIST_LABEL = NAME + ".op.list";
|
|
|
+ static final String OP_COUNT_LABEL = NAME + ".op.count";
|
|
|
+
|
|
|
+ static final String USAGE = "java " + DistCh.class.getName()
|
|
|
+ + " [OPTIONS] <path:owner:group:permission>+ "
|
|
|
+
|
|
|
+ + "\n\nThe values of owner, group and permission can be empty."
|
|
|
+ + "\nPermission is a octal number."
|
|
|
+
|
|
|
+ + "\n\nOPTIONS:"
|
|
|
+ + "\n-f <urilist_uri> Use list at <urilist_uri> as src list"
|
|
|
+ + "\n-i Ignore failures"
|
|
|
+ + "\n-log <logdir> Write logs to <logdir>"
|
|
|
+ ;
|
|
|
+
|
|
|
+ private static final long OP_PER_MAP = 1000;
|
|
|
+ private static final int MAX_MAPS_PER_NODE = 20;
|
|
|
+ private static final int SYNC_FILE_MAX = 10;
|
|
|
+
|
|
|
+ static enum Counter { SUCCEED, FAIL }
|
|
|
+
|
|
|
+ static enum Option {
|
|
|
+ IGNORE_FAILURES("-i", NAME + ".ignore.failures");
|
|
|
+
|
|
|
+ final String cmd, propertyname;
|
|
|
+
|
|
|
+ private Option(String cmd, String propertyname) {
|
|
|
+ this.cmd = cmd;
|
|
|
+ this.propertyname = propertyname;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ DistCh(Configuration conf) {
|
|
|
+ super(createJobConf(conf));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static JobConf createJobConf(Configuration conf) {
|
|
|
+ JobConf jobconf = new JobConf(conf, DistCh.class);
|
|
|
+ jobconf.setJobName(NAME);
|
|
|
+ jobconf.setMapSpeculativeExecution(false);
|
|
|
+
|
|
|
+ jobconf.setInputFormat(ChangeInputFormat.class);
|
|
|
+ jobconf.setOutputKeyClass(Text.class);
|
|
|
+ jobconf.setOutputValueClass(Text.class);
|
|
|
+
|
|
|
+ jobconf.setMapperClass(ChangeFilesMapper.class);
|
|
|
+ jobconf.setNumReduceTasks(0);
|
|
|
+ return jobconf;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** File operations. */
|
|
|
+ static class FileOperation implements Writable {
|
|
|
+ private Path src;
|
|
|
+ private String owner;
|
|
|
+ private String group;
|
|
|
+ private FsPermission permission;
|
|
|
+
|
|
|
+ FileOperation() {}
|
|
|
+
|
|
|
+ FileOperation(Path src, FileOperation that) {
|
|
|
+ this.src = src;
|
|
|
+ this.owner = that.owner;
|
|
|
+ this.group = that.group;
|
|
|
+ this.permission = that.permission;
|
|
|
+ checkState();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * path:owner:group:permission
|
|
|
+ * e.g.
|
|
|
+ * /user/foo:foo:bar:700
|
|
|
+ */
|
|
|
+ FileOperation(String line) {
|
|
|
+ try {
|
|
|
+ String[] t = line.split(":", 4);
|
|
|
+ for(int i = 0; i < t.length; i++) {
|
|
|
+ if ("".equals(t[i])) {
|
|
|
+ t[i] = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ src = new Path(t[0]);
|
|
|
+ owner = t[1];
|
|
|
+ group = t[2];
|
|
|
+ permission = t[3] == null? null:
|
|
|
+ new FsPermission(Short.parseShort(t[3], 8));
|
|
|
+
|
|
|
+ checkState();
|
|
|
+ }
|
|
|
+ catch(Exception e) {
|
|
|
+ throw (IllegalArgumentException)new IllegalArgumentException(
|
|
|
+ "line=" + line).initCause(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkState() throws IllegalStateException {
|
|
|
+ if (owner == null && group == null && permission == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "owner == null && group == null && permission == null");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static final FsPermission FILE_UMASK
|
|
|
+ = FsPermission.createImmutable((short)0111);
|
|
|
+
|
|
|
+ private boolean isDifferent(FileStatus original) {
|
|
|
+ if (owner != null && !owner.equals(original.getOwner())) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ if (group != null && !group.equals(original.getGroup())) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ if (permission != null) {
|
|
|
+ FsPermission orig = original.getPermission();
|
|
|
+ return original.isDir()? !permission.equals(orig):
|
|
|
+ !permission.applyUMask(FILE_UMASK).equals(orig);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ void run(Configuration conf) throws IOException {
|
|
|
+ FileSystem fs = src.getFileSystem(conf);
|
|
|
+ if (permission != null) {
|
|
|
+ fs.setPermission(src, permission);
|
|
|
+ }
|
|
|
+ if (owner != null || group != null) {
|
|
|
+ fs.setOwner(src, owner, group);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public void readFields(DataInput in) throws IOException {
|
|
|
+ this.src = new Path(Text.readString(in));
|
|
|
+ owner = DistTool.readString(in);
|
|
|
+ group = DistTool.readString(in);
|
|
|
+ permission = in.readBoolean()? FsPermission.read(in): null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public void write(DataOutput out) throws IOException {
|
|
|
+ Text.writeString(out, src.toString());
|
|
|
+ DistTool.writeString(out, owner);
|
|
|
+ DistTool.writeString(out, group);
|
|
|
+
|
|
|
+ boolean b = permission != null;
|
|
|
+ out.writeBoolean(b);
|
|
|
+ if (b) {permission.write(out);}
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public String toString() {
|
|
|
+ return src + ":" + owner + ":" + group + ":" + permission;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Responsible for generating splits of the src file list. */
|
|
|
+ static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
|
|
|
+ /** Do nothing. */
|
|
|
+ public void validateInput(JobConf job) {}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Produce splits such that each is no greater than the quotient of the
|
|
|
+ * total size and the number of splits requested.
|
|
|
+ * @param job The handle to the JobConf object
|
|
|
+ * @param numSplits Number of splits requested
|
|
|
+ */
|
|
|
+ public InputSplit[] getSplits(JobConf job, int numSplits
|
|
|
+ ) throws IOException {
|
|
|
+ final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
|
|
|
+ final int targetcount = srcCount / numSplits;
|
|
|
+ String srclist = job.get(OP_LIST_LABEL, "");
|
|
|
+ if (srcCount < 0 || "".equals(srclist)) {
|
|
|
+ throw new RuntimeException("Invalid metadata: #files(" + srcCount +
|
|
|
+ ") listuri(" + srclist + ")");
|
|
|
+ }
|
|
|
+ Path srcs = new Path(srclist);
|
|
|
+ FileSystem fs = srcs.getFileSystem(job);
|
|
|
+
|
|
|
+ List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
|
|
|
+
|
|
|
+ Text key = new Text();
|
|
|
+ FileOperation value = new FileOperation();
|
|
|
+ SequenceFile.Reader in = null;
|
|
|
+ long prev = 0L;
|
|
|
+ int count = 0; //count src
|
|
|
+ try {
|
|
|
+ for(in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value); ) {
|
|
|
+ long curr = in.getPosition();
|
|
|
+ long delta = curr - prev;
|
|
|
+ if (++count > targetcount) {
|
|
|
+ count = 0;
|
|
|
+ splits.add(new FileSplit(srcs, prev, delta, (String[])null));
|
|
|
+ prev = curr;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+ long remaining = fs.getFileStatus(srcs).getLen() - prev;
|
|
|
+ if (remaining != 0) {
|
|
|
+ splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
|
|
|
+ }
|
|
|
+ LOG.info("numSplits=" + numSplits + ", splits.size()=" + splits.size());
|
|
|
+ return splits.toArray(new FileSplit[splits.size()]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
|
|
|
+ JobConf job, Reporter reporter) throws IOException {
|
|
|
+ return new SequenceFileRecordReader<Text, FileOperation>(job,
|
|
|
+ (FileSplit)split);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** The mapper for changing files. */
|
|
|
+ static class ChangeFilesMapper
|
|
|
+ implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
|
|
|
+ private JobConf jobconf;
|
|
|
+ private boolean ignoreFailures;
|
|
|
+
|
|
|
+ private int failcount = 0;
|
|
|
+ private int succeedcount = 0;
|
|
|
+
|
|
|
+ private String getCountString() {
|
|
|
+ return "Succeeded: " + succeedcount + " Failed: " + failcount;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public void configure(JobConf job) {
|
|
|
+ this.jobconf = job;
|
|
|
+ ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Run a FileOperation */
|
|
|
+ public void map(Text key, FileOperation value,
|
|
|
+ OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
|
|
|
+ ) throws IOException {
|
|
|
+ try {
|
|
|
+ value.run(jobconf);
|
|
|
+ ++succeedcount;
|
|
|
+ reporter.incrCounter(Counter.SUCCEED, 1);
|
|
|
+ } catch (IOException e) {
|
|
|
+ ++failcount;
|
|
|
+ reporter.incrCounter(Counter.FAIL, 1);
|
|
|
+
|
|
|
+ String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
|
|
|
+ out.collect(null, new Text(s));
|
|
|
+ LOG.info(s);
|
|
|
+ } finally {
|
|
|
+ reporter.setStatus(getCountString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public void close() throws IOException {
|
|
|
+ if (failcount == 0 || ignoreFailures) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ throw new IOException(getCountString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void check(Configuration conf, List<FileOperation> ops
|
|
|
+ ) throws InvalidInputException {
|
|
|
+ List<Path> srcs = new ArrayList<Path>();
|
|
|
+ for(FileOperation op : ops) {
|
|
|
+ srcs.add(op.src);
|
|
|
+ }
|
|
|
+ DistTool.checkSource(conf, srcs);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static List<FileOperation> fetchList(Configuration conf, Path inputfile
|
|
|
+ ) throws IOException {
|
|
|
+ List<FileOperation> result = new ArrayList<FileOperation>();
|
|
|
+ for(String line : readFile(conf, inputfile)) {
|
|
|
+ result.add(new FileOperation(line));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** This is the main driver for recursively changing files properties. */
|
|
|
+ public int run(String[] args) throws Exception {
|
|
|
+ List<FileOperation> ops = new ArrayList<FileOperation>();
|
|
|
+ Path logpath = null;
|
|
|
+ boolean isIgnoreFailures = false;
|
|
|
+
|
|
|
+ try {
|
|
|
+ for (int idx = 0; idx < args.length; idx++) {
|
|
|
+ if ("-f".equals(args[idx])) {
|
|
|
+ if (++idx == args.length) {
|
|
|
+ System.out.println("urilist_uri not specified");
|
|
|
+ System.out.println(USAGE);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ ops.addAll(fetchList(jobconf, new Path(args[idx])));
|
|
|
+ } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
|
|
|
+ isIgnoreFailures = true;
|
|
|
+ } else if ("-log".equals(args[idx])) {
|
|
|
+ if (++idx == args.length) {
|
|
|
+ System.out.println("logdir not specified");
|
|
|
+ System.out.println(USAGE);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ logpath = new Path(args[idx]);
|
|
|
+ } else if ('-' == args[idx].codePointAt(0)) {
|
|
|
+ System.out.println("Invalid switch " + args[idx]);
|
|
|
+ System.out.println(USAGE);
|
|
|
+ ToolRunner.printGenericCommandUsage(System.out);
|
|
|
+ return -1;
|
|
|
+ } else {
|
|
|
+ ops.add(new FileOperation(args[idx]));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // mandatory command-line parameters
|
|
|
+ if (ops.isEmpty()) {
|
|
|
+ throw new IllegalStateException("Operation is empty");
|
|
|
+ }
|
|
|
+ LOG.info("ops=" + ops);
|
|
|
+ LOG.info("isIgnoreFailures=" + isIgnoreFailures);
|
|
|
+ jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
|
|
|
+ check(jobconf, ops);
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (setup(ops, logpath)) {
|
|
|
+ JobClient.runJob(jobconf);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ if (logpath == null) {
|
|
|
+ //delete log directory
|
|
|
+ final Path logdir = FileOutputFormat.getOutputPath(jobconf);
|
|
|
+ if (logdir != null) {
|
|
|
+ logdir.getFileSystem(jobconf).delete(logdir, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ //delete job directory
|
|
|
+ final String jobdir = jobconf.get(JOB_DIR_LABEL);
|
|
|
+ if (jobdir != null) {
|
|
|
+ final Path jobpath = new Path(jobdir);
|
|
|
+ jobpath.getFileSystem(jobconf).delete(jobpath, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch(DuplicationException e) {
|
|
|
+ LOG.error("Input error:", e);
|
|
|
+ return DuplicationException.ERROR_CODE;
|
|
|
+ } catch(Exception e) {
|
|
|
+ LOG.error(NAME + " failed: ", e);
|
|
|
+ System.out.println(USAGE);
|
|
|
+ ToolRunner.printGenericCommandUsage(System.out);
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Calculate how many maps to run. */
|
|
|
+ private static int getMapCount(int srcCount, int numNodes) {
|
|
|
+ int numMaps = (int)(srcCount / OP_PER_MAP);
|
|
|
+ numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
|
|
|
+ return Math.max(numMaps, 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean setup(List<FileOperation> ops, Path log) throws IOException {
|
|
|
+ final String randomId = getRandomId();
|
|
|
+ JobClient jClient = new JobClient(jobconf);
|
|
|
+ Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
|
|
|
+ LOG.info(JOB_DIR_LABEL + "=" + jobdir);
|
|
|
+
|
|
|
+ if (log == null) {
|
|
|
+ log = new Path(jobdir, "_logs");
|
|
|
+ }
|
|
|
+ FileOutputFormat.setOutputPath(jobconf, log);
|
|
|
+ LOG.info("log=" + log);
|
|
|
+
|
|
|
+ //create operation list
|
|
|
+ FileSystem fs = jobdir.getFileSystem(jobconf);
|
|
|
+ Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
|
|
|
+ jobconf.set(OP_LIST_LABEL, opList.toString());
|
|
|
+ int opCount = 0, synCount = 0;
|
|
|
+ SequenceFile.Writer opWriter = null;
|
|
|
+ try {
|
|
|
+ opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
|
|
|
+ FileOperation.class, SequenceFile.CompressionType.NONE);
|
|
|
+ for(FileOperation op : ops) {
|
|
|
+ FileStatus srcstat = fs.getFileStatus(op.src);
|
|
|
+ if (srcstat.isDir() && op.isDifferent(srcstat)) {
|
|
|
+ ++opCount;
|
|
|
+ opWriter.append(new Text(op.src.toString()), op);
|
|
|
+ }
|
|
|
+
|
|
|
+ Stack<Path> pathstack = new Stack<Path>();
|
|
|
+ for(pathstack.push(op.src); !pathstack.empty(); ) {
|
|
|
+ for(FileStatus stat : fs.listStatus(pathstack.pop())) {
|
|
|
+ if (stat.isDir()) {
|
|
|
+ pathstack.push(stat.getPath());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (op.isDifferent(stat)) {
|
|
|
+ ++opCount;
|
|
|
+ if (++synCount > SYNC_FILE_MAX) {
|
|
|
+ opWriter.sync();
|
|
|
+ synCount = 0;
|
|
|
+ }
|
|
|
+ Path f = stat.getPath();
|
|
|
+ opWriter.append(new Text(f.toString()), new FileOperation(f, op));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ opWriter.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
|
|
|
+ jobconf.setInt(OP_COUNT_LABEL, opCount);
|
|
|
+ LOG.info(OP_COUNT_LABEL + "=" + opCount);
|
|
|
+ jobconf.setNumMapTasks(getMapCount(opCount,
|
|
|
+ new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
|
|
|
+ return opCount != 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void checkDuplication(FileSystem fs, Path file, Path sorted,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
|
|
|
+ new Text.Comparator(), Text.class, FileOperation.class, conf);
|
|
|
+ sorter.sort(file, sorted);
|
|
|
+ SequenceFile.Reader in = null;
|
|
|
+ try {
|
|
|
+ in = new SequenceFile.Reader(fs, sorted, conf);
|
|
|
+ FileOperation curop = new FileOperation();
|
|
|
+ Text prevsrc = null, cursrc = new Text();
|
|
|
+ for(; in.next(cursrc, curop); ) {
|
|
|
+ if (prevsrc != null && cursrc.equals(prevsrc)) {
|
|
|
+ throw new DuplicationException(
|
|
|
+ "Invalid input, there are duplicated files in the sources: "
|
|
|
+ + prevsrc + ", " + cursrc);
|
|
|
+ }
|
|
|
+ prevsrc = cursrc;
|
|
|
+ cursrc = new Text();
|
|
|
+ curop = new FileOperation();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ finally {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
+ System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
|
|
|
+ }
|
|
|
+}
|