SetReplication.java 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.fs.shell;
  19. import java.io.IOException;
  20. import java.util.LinkedList;
  21. import java.util.List;
  22. import org.apache.hadoop.classification.InterfaceAudience;
  23. import org.apache.hadoop.classification.InterfaceStability;
  24. import org.apache.hadoop.fs.BlockLocation;
  25. import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
  26. /**
  27. * Modifies the replication factor
  28. */
  29. @InterfaceAudience.Private
  30. @InterfaceStability.Unstable
  31. class SetReplication extends FsCommand {
  32. public static void registerCommands(CommandFactory factory) {
  33. factory.addClass(SetReplication.class, "-setrep");
  34. }
  35. public static final String NAME = "setrep";
  36. public static final String USAGE = "[-R] [-w] <rep> <path/file> ...";
  37. public static final String DESCRIPTION =
  38. "Set the replication level of a file.\n" +
  39. "The -R flag requests a recursive change of replication level\n" +
  40. "for an entire tree.";
  41. protected short newRep = 0;
  42. protected List<PathData> waitList = new LinkedList<PathData>();
  43. protected boolean waitOpt = false;
  44. @Override
  45. protected void processOptions(LinkedList<String> args) throws IOException {
  46. CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "R", "w");
  47. cf.parse(args);
  48. waitOpt = cf.getOpt("w");
  49. setRecursive(cf.getOpt("R"));
  50. try {
  51. newRep = Short.parseShort(args.removeFirst());
  52. } catch (NumberFormatException nfe) {
  53. displayWarning("Illegal replication, a positive integer expected");
  54. throw nfe;
  55. }
  56. if (newRep < 1) {
  57. throw new IllegalArgumentException("replication must be >= 1");
  58. }
  59. }
  60. @Override
  61. protected void processArguments(LinkedList<PathData> args)
  62. throws IOException {
  63. super.processArguments(args);
  64. if (waitOpt) waitForReplication();
  65. }
  66. @Override
  67. protected void processPath(PathData item) throws IOException {
  68. if (item.stat.isSymlink()) {
  69. throw new PathIOException(item.toString(), "Symlinks unsupported");
  70. }
  71. if (item.stat.isFile()) {
  72. if (!item.fs.setReplication(item.path, newRep)) {
  73. throw new IOException("Could not set replication for: " + item);
  74. }
  75. out.println("Replication " + newRep + " set: " + item);
  76. if (waitOpt) waitList.add(item);
  77. }
  78. }
  79. /**
  80. * Wait for all files in waitList to have replication number equal to rep.
  81. */
  82. private void waitForReplication() throws IOException {
  83. for (PathData item : waitList) {
  84. out.print("Waiting for " + item + " ...");
  85. out.flush();
  86. boolean printedWarning = false;
  87. boolean done = false;
  88. while (!done) {
  89. item.refreshStatus();
  90. BlockLocation[] locations =
  91. item.fs.getFileBlockLocations(item.stat, 0, item.stat.getLen());
  92. int i = 0;
  93. for(; i < locations.length; i++) {
  94. int currentRep = locations[i].getHosts().length;
  95. if (currentRep != newRep) {
  96. if (!printedWarning && currentRep > newRep) {
  97. out.println("\nWARNING: the waiting time may be long for "
  98. + "DECREASING the number of replications.");
  99. printedWarning = true;
  100. }
  101. break;
  102. }
  103. }
  104. done = i == locations.length;
  105. if (done) break;
  106. out.print(".");
  107. out.flush();
  108. try {Thread.sleep(10000);} catch (InterruptedException e) {}
  109. }
  110. out.println(" done");
  111. }
  112. }
  113. }