|
@@ -23,14 +23,17 @@ import java.io.IOException;
|
|
|
import java.io.PrintStream;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Scanner;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.amazonaws.services.s3.model.MultipartUpload;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import org.slf4j.Logger;
|
|
@@ -44,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.RemoteIterator;
|
|
|
+import org.apache.hadoop.fs.s3a.MultipartUtils;
|
|
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
|
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
|
|
import org.apache.hadoop.fs.s3a.S3AUtils;
|
|
@@ -55,6 +59,7 @@ import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
|
|
+import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
|
|
|
import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
|
|
|
|
|
|
/**
|
|
@@ -79,6 +84,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
"\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" +
|
|
|
"\t" + Import.NAME + " - " + Import.PURPOSE + "\n" +
|
|
|
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
|
|
|
+ "\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n" +
|
|
|
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
|
|
|
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
|
|
|
"\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n";
|
|
@@ -100,10 +106,14 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
private final CommandFormat commandFormat;
|
|
|
|
|
|
public static final String META_FLAG = "meta";
|
|
|
+
|
|
|
+ // These are common to prune, upload subcommands.
|
|
|
public static final String DAYS_FLAG = "days";
|
|
|
public static final String HOURS_FLAG = "hours";
|
|
|
public static final String MINUTES_FLAG = "minutes";
|
|
|
public static final String SECONDS_FLAG = "seconds";
|
|
|
+ public static final String AGE_OPTIONS_USAGE = "[-days <days>] "
|
|
|
+ + "[-hours <hours>] [-minutes <minutes>] [-seconds <seconds>]";
|
|
|
|
|
|
public static final String REGION_FLAG = "region";
|
|
|
public static final String READ_FLAG = "read";
|
|
@@ -177,6 +187,36 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
"config, or S3 bucket");
|
|
|
}
|
|
|
|
|
|
+ private long getDeltaComponent(TimeUnit unit, String arg) {
|
|
|
+ String raw = getCommandFormat().getOptValue(arg);
|
|
|
+ if (raw == null || raw.isEmpty()) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ Long parsed = Long.parseLong(raw);
|
|
|
+ return unit.toMillis(parsed);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Convert all age options supplied to total milliseconds of time.
|
|
|
+ * @return Sum of all age options, or zero if none were given.
|
|
|
+ */
|
|
|
+ long ageOptionsToMsec() {
|
|
|
+ long cliDelta = 0;
|
|
|
+ cliDelta += getDeltaComponent(TimeUnit.DAYS, DAYS_FLAG);
|
|
|
+ cliDelta += getDeltaComponent(TimeUnit.HOURS, HOURS_FLAG);
|
|
|
+ cliDelta += getDeltaComponent(TimeUnit.MINUTES, MINUTES_FLAG);
|
|
|
+ cliDelta += getDeltaComponent(TimeUnit.SECONDS, SECONDS_FLAG);
|
|
|
+ return cliDelta;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void addAgeOptions() {
|
|
|
+ CommandFormat format = getCommandFormat();
|
|
|
+ format.addOptionWithValue(DAYS_FLAG);
|
|
|
+ format.addOptionWithValue(HOURS_FLAG);
|
|
|
+ format.addOptionWithValue(MINUTES_FLAG);
|
|
|
+ format.addOptionWithValue(SECONDS_FLAG);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Parse metadata store from command line option or HDFS configuration.
|
|
|
*
|
|
@@ -867,7 +907,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
"Common options:\n" +
|
|
|
" -" + META_FLAG + " URL - Metadata repository details " +
|
|
|
"(implementation-specific)\n" +
|
|
|
- "\n" +
|
|
|
+ "Age options. Any combination of these integer-valued options:\n" +
|
|
|
+ AGE_OPTIONS_USAGE + "\n" +
|
|
|
"Amazon DynamoDB-specific options:\n" +
|
|
|
" -" + REGION_FLAG + " REGION - Service region for connections\n" +
|
|
|
"\n" +
|
|
@@ -877,12 +918,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
|
|
|
Prune(Configuration conf) {
|
|
|
super(conf);
|
|
|
-
|
|
|
- CommandFormat format = getCommandFormat();
|
|
|
- format.addOptionWithValue(DAYS_FLAG);
|
|
|
- format.addOptionWithValue(HOURS_FLAG);
|
|
|
- format.addOptionWithValue(MINUTES_FLAG);
|
|
|
- format.addOptionWithValue(SECONDS_FLAG);
|
|
|
+ addAgeOptions();
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -901,15 +937,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
return USAGE;
|
|
|
}
|
|
|
|
|
|
- private long getDeltaComponent(TimeUnit unit, String arg) {
|
|
|
- String raw = getCommandFormat().getOptValue(arg);
|
|
|
- if (raw == null || raw.isEmpty()) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- Long parsed = Long.parseLong(raw);
|
|
|
- return unit.toMillis(parsed);
|
|
|
- }
|
|
|
-
|
|
|
public int run(String[] args, PrintStream out) throws
|
|
|
InterruptedException, IOException {
|
|
|
List<String> paths = parseArgs(args);
|
|
@@ -924,11 +951,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
Configuration conf = getConf();
|
|
|
long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0);
|
|
|
|
|
|
- long cliDelta = 0;
|
|
|
- cliDelta += getDeltaComponent(TimeUnit.DAYS, "days");
|
|
|
- cliDelta += getDeltaComponent(TimeUnit.HOURS, "hours");
|
|
|
- cliDelta += getDeltaComponent(TimeUnit.MINUTES, "minutes");
|
|
|
- cliDelta += getDeltaComponent(TimeUnit.SECONDS, "seconds");
|
|
|
+ long cliDelta = ageOptionsToMsec();
|
|
|
|
|
|
if (confDelta <= 0 && cliDelta <= 0) {
|
|
|
errorln("You must specify a positive age for metadata to prune.");
|
|
@@ -1080,6 +1103,214 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Command to list / abort pending multipart uploads.
|
|
|
+ */
|
|
|
+ static class Uploads extends S3GuardTool {
|
|
|
+ public static final String NAME = "uploads";
|
|
|
+ public static final String ABORT = "abort";
|
|
|
+ public static final String LIST = "list";
|
|
|
+ public static final String EXPECT = "expect";
|
|
|
+ public static final String VERBOSE = "verbose";
|
|
|
+ public static final String FORCE = "force";
|
|
|
+
|
|
|
+ public static final String PURPOSE = "list or abort pending " +
|
|
|
+ "multipart uploads";
|
|
|
+ private static final String USAGE = NAME + " [OPTIONS] " +
|
|
|
+ "s3a://BUCKET[/path]\n"
|
|
|
+ + "\t" + PURPOSE + "\n\n"
|
|
|
+ + "Common options:\n"
|
|
|
+ + " (-" + LIST + " | -" + EXPECT +" <num-uploads> | -" + ABORT
|
|
|
+ + ") [-" + VERBOSE +"] "
|
|
|
+ + "[<age-options>] [-force]\n"
|
|
|
+ + "\t - Under given path, list or delete all uploads," +
|
|
|
+ " or only those \n"
|
|
|
+ + "older than specified by <age-options>\n"
|
|
|
+ + "<age-options> are any combination of the integer-valued options:\n"
|
|
|
+ + "\t" + AGE_OPTIONS_USAGE + "\n"
|
|
|
+ + "-" + EXPECT + " is similar to list, except no output is printed,\n"
|
|
|
+ + "\tbut the exit code will be an error if the provided number\n"
|
|
|
+ + "\tis different that the number of uploads found by the command.\n"
|
|
|
+ + "-" + FORCE + " option prevents the \"Are you sure\" prompt when\n"
|
|
|
+ + "\tusing -" + ABORT;
|
|
|
+
|
|
|
+ /** Constant used for output and parsed by tests. */
|
|
|
+ public static final String TOTAL = "Total";
|
|
|
+
|
|
|
+ /** Runs in one of three modes. */
|
|
|
+ private enum Mode { LIST, EXPECT, ABORT };
|
|
|
+ private Mode mode = null;
|
|
|
+
|
|
|
+ /** For Mode == EXPECT, expected listing size. */
|
|
|
+ private int expectedCount;
|
|
|
+
|
|
|
+ /** List/abort uploads older than this many milliseconds. */
|
|
|
+ private long ageMsec = 0;
|
|
|
+
|
|
|
+ /** Verbose output flag. */
|
|
|
+ private boolean verbose = false;
|
|
|
+
|
|
|
+ /** Whether to delete with out "are you sure" prompt. */
|
|
|
+ private boolean force = false;
|
|
|
+
|
|
|
+ /** Path prefix to use when searching multipart uploads. */
|
|
|
+ private String prefix;
|
|
|
+
|
|
|
+ Uploads(Configuration conf) {
|
|
|
+ super(conf, ABORT, LIST, VERBOSE, FORCE);
|
|
|
+ addAgeOptions();
|
|
|
+ getCommandFormat().addOptionWithValue(EXPECT);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ String getName() {
|
|
|
+ return NAME;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getUsage() {
|
|
|
+ return USAGE;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int run(String[] args, PrintStream out)
|
|
|
+ throws InterruptedException, IOException {
|
|
|
+ List<String> paths = parseArgs(args);
|
|
|
+ if (paths.isEmpty()) {
|
|
|
+ errorln(getUsage());
|
|
|
+ throw invalidArgs("No options specified");
|
|
|
+ }
|
|
|
+ processArgs(paths, out);
|
|
|
+ promptBeforeAbort(out);
|
|
|
+ processUploads(out);
|
|
|
+
|
|
|
+ out.flush();
|
|
|
+ return SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void promptBeforeAbort(PrintStream out) throws IOException {
|
|
|
+ if (mode != Mode.ABORT || force) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Scanner scanner = new Scanner(System.in, "UTF-8");
|
|
|
+ out.println("Are you sure you want to delete any pending " +
|
|
|
+ "uploads? (yes/no) >");
|
|
|
+ String response = scanner.nextLine();
|
|
|
+ if (!"yes".equalsIgnoreCase(response)) {
|
|
|
+ throw S3GuardTool.userAborted("User did not answer yes, quitting.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processUploads(PrintStream out) throws IOException {
|
|
|
+ MultipartUtils.UploadIterator uploads;
|
|
|
+ uploads = getFilesystem().listUploads(prefix);
|
|
|
+
|
|
|
+ int count = 0;
|
|
|
+ while (uploads.hasNext()) {
|
|
|
+ MultipartUpload upload = uploads.next();
|
|
|
+ if (!olderThan(upload, ageMsec)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ count++;
|
|
|
+ if (mode == Mode.ABORT || mode == Mode.LIST || verbose) {
|
|
|
+ println(out, "%s%s %s", mode == Mode.ABORT ? "Deleting: " : "",
|
|
|
+ upload.getKey(), upload.getUploadId());
|
|
|
+ }
|
|
|
+ if (mode == Mode.ABORT) {
|
|
|
+ getFilesystem().getWriteOperationHelper()
|
|
|
+ .abortMultipartUpload(upload.getKey(), upload.getUploadId(),
|
|
|
+ LOG_EVENT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (mode != Mode.EXPECT || verbose) {
|
|
|
+ println(out, "%s %d uploads %s.", TOTAL, count,
|
|
|
+ mode == Mode.ABORT ? "deleted" : "found");
|
|
|
+ }
|
|
|
+ if (mode == Mode.EXPECT) {
|
|
|
+ if (count != expectedCount) {
|
|
|
+ throw badState("Expected %d uploads, found %d", expectedCount, count);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if upload is at least as old as given age.
|
|
|
+ * @param u upload to check
|
|
|
+ * @param msec age in milliseconds
|
|
|
+ * @return true iff u was created at least age milliseconds ago.
|
|
|
+ */
|
|
|
+ private boolean olderThan(MultipartUpload u, long msec) {
|
|
|
+ Date ageDate = new Date(System.currentTimeMillis() - msec);
|
|
|
+ return ageDate.compareTo(u.getInitiated()) >= 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processArgs(List<String> args, PrintStream out)
|
|
|
+ throws IOException {
|
|
|
+ CommandFormat commands = getCommandFormat();
|
|
|
+ String err = "Can only specify one of -" + LIST + ", " +
|
|
|
+ " -" + ABORT + ", and " + EXPECT;
|
|
|
+
|
|
|
+ // Three mutually-exclusive options
|
|
|
+ if (commands.getOpt(LIST)) {
|
|
|
+ mode = Mode.LIST;
|
|
|
+ }
|
|
|
+ if (commands.getOpt(ABORT)) {
|
|
|
+ if (mode != null) {
|
|
|
+ throw invalidArgs(err);
|
|
|
+ }
|
|
|
+ mode = Mode.ABORT;
|
|
|
+ }
|
|
|
+
|
|
|
+ String expectVal = commands.getOptValue(EXPECT);
|
|
|
+ if (expectVal != null) {
|
|
|
+ if (mode != null) {
|
|
|
+ throw invalidArgs(err);
|
|
|
+ }
|
|
|
+ mode = Mode.EXPECT;
|
|
|
+ expectedCount = Integer.parseInt(expectVal);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Default to list
|
|
|
+ if (mode == null) {
|
|
|
+ vprintln(out, "No mode specified, defaulting to -" + LIST);
|
|
|
+ mode = Mode.LIST;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Other flags
|
|
|
+ if (commands.getOpt(VERBOSE)) {
|
|
|
+ verbose = true;
|
|
|
+ }
|
|
|
+ if (commands.getOpt(FORCE)) {
|
|
|
+ force = true;
|
|
|
+ }
|
|
|
+ ageMsec = ageOptionsToMsec();
|
|
|
+
|
|
|
+ String s3Path = args.get(0);
|
|
|
+ URI uri = S3GuardTool.toUri(s3Path);
|
|
|
+ prefix = uri.getPath();
|
|
|
+ if (prefix.length() > 0) {
|
|
|
+ prefix = prefix.substring(1);
|
|
|
+ }
|
|
|
+ vprintln(out, "Command: %s, age %d msec, path %s (prefix \"%s\")",
|
|
|
+ mode.name(), ageMsec, s3Path, prefix);
|
|
|
+
|
|
|
+ initS3AFileSystem(s3Path);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If verbose flag is set, print a formatted string followed by a newline
|
|
|
+ * to the output stream.
|
|
|
+ * @param out destination
|
|
|
+ * @param format format string
|
|
|
+ * @param args optional arguments
|
|
|
+ */
|
|
|
+ private void vprintln(PrintStream out, String format, Object...
|
|
|
+ args) {
|
|
|
+ if (verbose) {
|
|
|
+ out.println(String.format(format, args));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static S3GuardTool command;
|
|
|
|
|
|
/**
|
|
@@ -1182,6 +1413,17 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
String.format(format, args));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Build the exception to raise on user-aborted action.
|
|
|
+ * @param format string format
|
|
|
+ * @param args optional arguments for the string
|
|
|
+ * @return a new exception to throw
|
|
|
+ */
|
|
|
+ protected static ExitUtil.ExitException userAborted(
|
|
|
+ String format, Object...args) {
|
|
|
+ return new ExitUtil.ExitException(ERROR, String.format(format, args));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Execute the command with the given arguments.
|
|
|
*
|
|
@@ -1224,6 +1466,9 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|
|
case SetCapacity.NAME:
|
|
|
command = new SetCapacity(conf);
|
|
|
break;
|
|
|
+ case Uploads.NAME:
|
|
|
+ command = new Uploads(conf);
|
|
|
+ break;
|
|
|
default:
|
|
|
printHelp();
|
|
|
throw new ExitUtil.ExitException(E_USAGE,
|