MapOutputLocation.java 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. /**
  2. * Copyright 2005 The Apache Software Foundation
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.apache.hadoop.mapred;
  17. import java.io.IOException;
  18. import java.io.*;
  19. import java.net.URL;
  20. import org.apache.hadoop.fs.Path;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.io.*;
  23. /** The location of a map output file, as passed to a reduce task via the
  24. * {@link InterTrackerProtocol}. */
  25. class MapOutputLocation implements Writable {
  26. static { // register a ctor
  27. WritableFactories.setFactory
  28. (MapOutputLocation.class,
  29. new WritableFactory() {
  30. public Writable newInstance() { return new MapOutputLocation(); }
  31. });
  32. }
  33. private String mapTaskId;
  34. private int mapId;
  35. private String host;
  36. private int port;
  37. /** RPC constructor **/
  38. public MapOutputLocation() {
  39. }
  40. /** Construct a location. */
  41. public MapOutputLocation(String mapTaskId, int mapId,
  42. String host, int port) {
  43. this.mapTaskId = mapTaskId;
  44. this.mapId = mapId;
  45. this.host = host;
  46. this.port = port;
  47. }
  48. /** The map task id. */
  49. public String getMapTaskId() { return mapTaskId; }
  50. /**
  51. * Get the map's id number.
  52. * @return The numeric id for this map
  53. */
  54. public int getMapId() {
  55. return mapId;
  56. }
  57. /** The host the task completed on. */
  58. public String getHost() { return host; }
  59. /** The port listening for {@link MapOutputProtocol} connections. */
  60. public int getPort() { return port; }
  61. public void write(DataOutput out) throws IOException {
  62. UTF8.writeString(out, mapTaskId);
  63. out.writeInt(mapId);
  64. UTF8.writeString(out, host);
  65. out.writeInt(port);
  66. }
  67. public void readFields(DataInput in) throws IOException {
  68. this.mapTaskId = UTF8.readString(in);
  69. this.mapId = in.readInt();
  70. this.host = UTF8.readString(in);
  71. this.port = in.readInt();
  72. }
  73. public String toString() {
  74. return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" +
  75. mapTaskId;
  76. }
  77. /**
  78. * An interface for callbacks when an method makes some progress.
  79. * @author Owen O'Malley
  80. */
  81. public static interface Pingable {
  82. void ping();
  83. }
  84. /**
  85. * Get the map output into a local file from the remote server.
  86. * We use the file system so that we generate checksum files on the data.
  87. * @param fileSys the filesystem to write the file to
  88. * @param localFilename the filename to write the data into
  89. * @param reduce the reduce id to get for
  90. * @param pingee a status object that wants to know when we make progress
  91. * @throws IOException when something goes wrong
  92. */
  93. public long getFile(FileSystem fileSys,
  94. Path localFilename,
  95. int reduce,
  96. Pingable pingee) throws IOException {
  97. URL path = new URL(toString() + "&reduce=" + reduce);
  98. InputStream input = path.openConnection().getInputStream();
  99. OutputStream output = fileSys.create(localFilename);
  100. long totalBytes = 0;
  101. try {
  102. byte[] buffer = new byte[64 * 1024];
  103. int len = input.read(buffer);
  104. while (len > 0) {
  105. totalBytes += len;
  106. output.write(buffer, 0 ,len);
  107. if (pingee != null) {
  108. pingee.ping();
  109. }
  110. len = input.read(buffer);
  111. }
  112. } finally {
  113. input.close();
  114. output.close();
  115. }
  116. return totalBytes;
  117. }
  118. }