BlockForensics.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. import java.io.BufferedReader;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.io.InputStreamReader;
  23. import java.lang.Runtime;
  24. import java.net.URL;
  25. import java.net.URLConnection;
  26. import java.util.Arrays;
  27. import java.util.Iterator;
  28. import java.util.LinkedList;
  29. import java.util.List;
  30. import java.util.Random;
  31. import java.util.Set;
  32. import java.util.StringTokenizer;
  33. import java.util.TreeSet;
  34. import javax.xml.parsers.DocumentBuilder;
  35. import javax.xml.parsers.DocumentBuilderFactory;
  36. import javax.xml.parsers.ParserConfigurationException;
  37. import org.w3c.dom.Document;
  38. import org.w3c.dom.NodeList;
  39. import org.xml.sax.SAXException;
  40. /**
  41. * This class repeatedly queries a namenode looking for corrupt replicas. If
  42. * any are found a provided hadoop job is launched and the output printed
  43. * to stdout.
  44. *
  45. * The syntax is:
  46. *
  47. * java BlockForensics http://[namenode]:[port]/corrupt_replicas_xml.jsp
  48. * [sleep time between namenode query for corrupt blocks
  49. * (in seconds)] [mapred jar location] [hdfs input path]
  50. *
  51. * All arguments are required.
  52. */
  53. public class BlockForensics {
  54. public static String join(List<?> l, String sep) {
  55. StringBuilder sb = new StringBuilder();
  56. Iterator it = l.iterator();
  57. while(it.hasNext()){
  58. sb.append(it.next());
  59. if (it.hasNext()) {
  60. sb.append(sep);
  61. }
  62. }
  63. return sb.toString();
  64. }
  65. // runs hadoop command and prints output to stdout
  66. public static void runHadoopCmd(String ... args)
  67. throws IOException {
  68. String hadoop_home = System.getenv("HADOOP_HOME");
  69. List<String> l = new LinkedList<String>();
  70. l.add("bin/hadoop");
  71. l.addAll(Arrays.asList(args));
  72. ProcessBuilder pb = new ProcessBuilder(l);
  73. if (hadoop_home != null) {
  74. pb.directory(new File(hadoop_home));
  75. }
  76. pb.redirectErrorStream(true);
  77. Process p = pb.start();
  78. BufferedReader br = new BufferedReader(
  79. new InputStreamReader(p.getInputStream()));
  80. String line;
  81. while ((line = br.readLine()) != null) {
  82. System.out.println(line);
  83. }
  84. }
  85. public static void main(String[] args)
  86. throws SAXException, ParserConfigurationException,
  87. InterruptedException, IOException {
  88. if (System.getenv("HADOOP_HOME") == null) {
  89. System.err.println("The environmental variable HADOOP_HOME is undefined");
  90. System.exit(1);
  91. }
  92. if (args.length < 4) {
  93. System.out.println("Usage: java BlockForensics [http://namenode:port/"
  94. + "corrupt_replicas_xml.jsp] [sleep time between "
  95. + "requests (in milliseconds)] [mapred jar location] "
  96. + "[hdfs input path]");
  97. return;
  98. }
  99. int sleepTime = 30000;
  100. try {
  101. sleepTime = Integer.parseInt(args[1]);
  102. } catch (NumberFormatException e) {
  103. System.out.println("The sleep time entered is invalid, "
  104. + "using default value: "+sleepTime+"ms");
  105. }
  106. Set<Long> blockIds = new TreeSet<Long>();
  107. while (true) {
  108. InputStream xml = new URL(args[0]).openConnection().getInputStream();
  109. DocumentBuilderFactory fact = DocumentBuilderFactory.newInstance();
  110. DocumentBuilder builder = fact.newDocumentBuilder();
  111. Document doc = builder.parse(xml);
  112. NodeList corruptReplicaNodes = doc.getElementsByTagName("block_id");
  113. List<Long> searchBlockIds = new LinkedList<Long>();
  114. for(int i=0; i<corruptReplicaNodes.getLength(); i++) {
  115. Long blockId = new Long(corruptReplicaNodes.item(i)
  116. .getFirstChild()
  117. .getNodeValue());
  118. if (!blockIds.contains(blockId)) {
  119. blockIds.add(blockId);
  120. searchBlockIds.add(blockId);
  121. }
  122. }
  123. if (searchBlockIds.size() > 0) {
  124. String blockIdsStr = BlockForensics.join(searchBlockIds, ",");
  125. System.out.println("\nSearching for: " + blockIdsStr);
  126. String tmpDir =
  127. new String("/tmp-block-forensics-" +
  128. Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
  129. System.out.println("Using temporary dir: "+tmpDir);
  130. // delete tmp dir
  131. BlockForensics.runHadoopCmd("fs", "-rmr", tmpDir);
  132. // launch mapred job
  133. BlockForensics.runHadoopCmd("jar",
  134. args[2], // jar location
  135. args[3], // input dir
  136. tmpDir, // output dir
  137. blockIdsStr// comma delimited list of blocks
  138. );
  139. // cat output
  140. BlockForensics.runHadoopCmd("fs", "-cat", tmpDir+"/part*");
  141. // delete temp dir
  142. BlockForensics.runHadoopCmd("fs", "-rmr", tmpDir);
  143. int sleepSecs = (int)(sleepTime/1000.);
  144. System.out.print("Sleeping for "+sleepSecs
  145. + " second"+(sleepSecs == 1?"":"s")+".");
  146. }
  147. System.out.print(".");
  148. Thread.sleep(sleepTime);
  149. }
  150. }
  151. }