JobSplit.java 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapreduce.split;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.io.UnsupportedEncodingException;
  23. import org.apache.hadoop.io.Text;
  24. import org.apache.hadoop.io.Writable;
  25. import org.apache.hadoop.io.WritableUtils;
  26. import org.apache.hadoop.mapreduce.InputSplit;
  27. /**
  28. * This class groups the fundamental classes associated with
  29. * reading/writing splits. The split information is divided into
  30. * two parts based on the consumer of the information. The two
  31. * parts are the split meta information, and the raw split
  32. * information. The first part is consumed by the JobTracker to
  33. * create the tasks' locality data structures. The second part is
  34. * used by the maps at runtime to know what to do!
  35. * These pieces of information are written to two separate files.
  36. * The metainformation file is slurped by the JobTracker during
  37. * job initialization. A map task gets the meta information during
  38. * the launch and it reads the raw split bytes directly from the
  39. * file.
  40. */
  41. public class JobSplit {
  42. static final int META_SPLIT_VERSION = 1;
  43. static final byte[] META_SPLIT_FILE_HEADER;
  44. static {
  45. try {
  46. META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
  47. } catch (UnsupportedEncodingException u) {
  48. throw new RuntimeException(u);
  49. }
  50. }
  51. public static final TaskSplitMetaInfo EMPTY_TASK_SPLIT =
  52. new TaskSplitMetaInfo();
  53. /**
  54. * This represents the meta information about the task split.
  55. * The main fields are
  56. * - start offset in actual split
  57. * - data length that will be processed in this split
  58. * - hosts on which this split is local
  59. */
  60. public static class SplitMetaInfo implements Writable {
  61. private long startOffset;
  62. private long inputDataLength;
  63. private String[] locations;
  64. public SplitMetaInfo() {}
  65. public SplitMetaInfo(String[] locations, long startOffset,
  66. long inputDataLength) {
  67. this.locations = locations;
  68. this.startOffset = startOffset;
  69. this.inputDataLength = inputDataLength;
  70. }
  71. public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
  72. try {
  73. this.locations = split.getLocations();
  74. this.inputDataLength = split.getLength();
  75. this.startOffset = startOffset;
  76. } catch (InterruptedException ie) {
  77. throw new IOException(ie);
  78. }
  79. }
  80. public String[] getLocations() {
  81. return locations;
  82. }
  83. public long getStartOffset() {
  84. return startOffset;
  85. }
  86. public long getInputDataLength() {
  87. return inputDataLength;
  88. }
  89. public void setInputDataLocations(String[] locations) {
  90. this.locations = locations;
  91. }
  92. public void setInputDataLength(long length) {
  93. this.inputDataLength = length;
  94. }
  95. public void readFields(DataInput in) throws IOException {
  96. int len = WritableUtils.readVInt(in);
  97. locations = new String[len];
  98. for (int i = 0; i < locations.length; i++) {
  99. locations[i] = Text.readString(in);
  100. }
  101. startOffset = WritableUtils.readVLong(in);
  102. inputDataLength = WritableUtils.readVLong(in);
  103. }
  104. public void write(DataOutput out) throws IOException {
  105. WritableUtils.writeVInt(out, locations.length);
  106. for (int i = 0; i < locations.length; i++) {
  107. Text.writeString(out, locations[i]);
  108. }
  109. WritableUtils.writeVLong(out, startOffset);
  110. WritableUtils.writeVLong(out, inputDataLength);
  111. }
  112. @Override
  113. public String toString() {
  114. StringBuffer buf = new StringBuffer();
  115. buf.append("data-size : " + inputDataLength + "\n");
  116. buf.append("start-offset : " + startOffset + "\n");
  117. buf.append("locations : " + "\n");
  118. for (String loc : locations) {
  119. buf.append(" " + loc + "\n");
  120. }
  121. return buf.toString();
  122. }
  123. }
  124. /**
  125. * This represents the meta information about the task split that the
  126. * JobTracker creates
  127. */
  128. public static class TaskSplitMetaInfo {
  129. private TaskSplitIndex splitIndex;
  130. private long inputDataLength;
  131. private String[] locations;
  132. public TaskSplitMetaInfo(){
  133. this.splitIndex = new TaskSplitIndex();
  134. this.locations = new String[0];
  135. }
  136. public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations,
  137. long inputDataLength) {
  138. this.splitIndex = splitIndex;
  139. this.locations = locations;
  140. this.inputDataLength = inputDataLength;
  141. }
  142. public TaskSplitMetaInfo(InputSplit split, long startOffset)
  143. throws InterruptedException, IOException {
  144. this(new TaskSplitIndex("", startOffset), split.getLocations(),
  145. split.getLength());
  146. }
  147. public TaskSplitMetaInfo(String[] locations, long startOffset,
  148. long inputDataLength) {
  149. this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
  150. }
  151. public TaskSplitIndex getSplitIndex() {
  152. return splitIndex;
  153. }
  154. public String getSplitLocation() {
  155. return splitIndex.getSplitLocation();
  156. }
  157. public long getInputDataLength() {
  158. return inputDataLength;
  159. }
  160. public String[] getLocations() {
  161. return locations;
  162. }
  163. public long getStartOffset() {
  164. return splitIndex.getStartOffset();
  165. }
  166. }
  167. /**
  168. * This represents the meta information about the task split that the
  169. * task gets
  170. */
  171. public static class TaskSplitIndex {
  172. private String splitLocation;
  173. private long startOffset;
  174. public TaskSplitIndex(){
  175. this("", 0);
  176. }
  177. public TaskSplitIndex(String splitLocation, long startOffset) {
  178. this.splitLocation = splitLocation;
  179. this.startOffset = startOffset;
  180. }
  181. public long getStartOffset() {
  182. return startOffset;
  183. }
  184. public String getSplitLocation() {
  185. return splitLocation;
  186. }
  187. public void readFields(DataInput in) throws IOException {
  188. splitLocation = Text.readString(in);
  189. startOffset = WritableUtils.readVLong(in);
  190. }
  191. public void write(DataOutput out) throws IOException {
  192. Text.writeString(out, splitLocation);
  193. WritableUtils.writeVLong(out, startOffset);
  194. }
  195. }
  196. }