WritableUtils.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.io;
  19. import java.io.*;
  20. import org.apache.hadoop.mapred.JobConf;
  21. import org.apache.hadoop.util.ReflectionUtils;
  22. import java.util.zip.GZIPInputStream;
  23. import java.util.zip.GZIPOutputStream;
  24. public final class WritableUtils {
  25. public static byte[] readCompressedByteArray(DataInput in) throws IOException {
  26. int length = in.readInt();
  27. if (length == -1) return null;
  28. byte[] buffer = new byte[length];
  29. in.readFully(buffer); // could/should use readFully(buffer,0,length)?
  30. GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
  31. byte[] outbuf = new byte[length];
  32. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  33. int len;
  34. while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
  35. bos.write(outbuf, 0, len);
  36. }
  37. byte[] decompressed = bos.toByteArray();
  38. bos.close();
  39. gzi.close();
  40. return decompressed;
  41. }
  42. public static void skipCompressedByteArray(DataInput in) throws IOException {
  43. int length = in.readInt();
  44. if (length != -1) in.skipBytes(length);
  45. }
  46. public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
  47. if (bytes != null) {
  48. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  49. GZIPOutputStream gzout = new GZIPOutputStream(bos);
  50. gzout.write(bytes, 0, bytes.length);
  51. gzout.close();
  52. byte[] buffer = bos.toByteArray();
  53. int len = buffer.length;
  54. out.writeInt(len);
  55. out.write(buffer, 0, len);
  56. /* debug only! Once we have confidence, can lose this. */
  57. return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
  58. } else {
  59. out.writeInt(-1);
  60. return -1;
  61. }
  62. }
  63. /* Ugly utility, maybe someone else can do this better */
  64. public static String readCompressedString(DataInput in) throws IOException {
  65. byte[] bytes = readCompressedByteArray(in);
  66. if (bytes == null) return null;
  67. return new String(bytes, "UTF-8");
  68. }
  69. public static int writeCompressedString(DataOutput out, String s) throws IOException {
  70. return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
  71. }
  72. /*
  73. *
  74. * Write a String as a Network Int n, followed by n Bytes
  75. * Alternative to 16 bit read/writeUTF.
  76. * Encoding standard is... ?
  77. *
  78. */
  79. public static void writeString(DataOutput out, String s) throws IOException {
  80. if (s != null) {
  81. byte[] buffer = s.getBytes("UTF-8");
  82. int len = buffer.length;
  83. out.writeInt(len);
  84. out.write(buffer, 0, len);
  85. } else {
  86. out.writeInt(-1);
  87. }
  88. }
  89. /*
  90. * Read a String as a Network Int n, followed by n Bytes
  91. * Alternative to 16 bit read/writeUTF.
  92. * Encoding standard is... ?
  93. *
  94. */
  95. public static String readString(DataInput in) throws IOException{
  96. int length = in.readInt();
  97. if (length == -1) return null;
  98. byte[] buffer = new byte[length];
  99. in.readFully(buffer); // could/should use readFully(buffer,0,length)?
  100. return new String(buffer,"UTF-8");
  101. }
  102. /*
  103. * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
  104. * Could be generalised using introspection.
  105. *
  106. */
  107. public static void writeStringArray(DataOutput out, String[] s) throws IOException{
  108. out.writeInt(s.length);
  109. for(int i = 0; i < s.length; i++) {
  110. writeString(out, s[i]);
  111. }
  112. }
  113. /*
  114. * Write a String array as a Nework Int N, followed by Int N Byte Array of
  115. * compressed Strings. Handles also null arrays and null values.
  116. * Could be generalised using introspection.
  117. *
  118. */
  119. public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
  120. if (s == null) {
  121. out.writeInt(-1);
  122. return;
  123. }
  124. out.writeInt(s.length);
  125. for(int i = 0; i < s.length; i++) {
  126. writeCompressedString(out, s[i]);
  127. }
  128. }
  129. /*
  130. * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
  131. * Could be generalised using introspection. Actually this bit couldn't...
  132. *
  133. */
  134. public static String[] readStringArray(DataInput in) throws IOException {
  135. int len = in.readInt();
  136. if (len == -1) return null;
  137. String[] s = new String[len];
  138. for(int i = 0; i < len; i++) {
  139. s[i] = readString(in);
  140. }
  141. return s;
  142. }
  143. /*
  144. * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
  145. * Could be generalised using introspection. Handles null arrays and null values.
  146. *
  147. */
  148. public static String[] readCompressedStringArray(DataInput in) throws IOException {
  149. int len = in.readInt();
  150. if (len == -1) return null;
  151. String[] s = new String[len];
  152. for(int i = 0; i < len; i++) {
  153. s[i] = readCompressedString(in);
  154. }
  155. return s;
  156. }
  157. /*
  158. *
  159. * Test Utility Method Display Byte Array.
  160. *
  161. */
  162. public static void displayByteArray(byte[] record){
  163. int i;
  164. for(i=0;i < record.length -1; i++){
  165. if (i % 16 == 0) { System.out.println(); }
  166. System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
  167. System.out.print(Integer.toHexString(record[i] & 0x0F));
  168. System.out.print(",");
  169. }
  170. System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
  171. System.out.print(Integer.toHexString(record[i] & 0x0F));
  172. System.out.println();
  173. }
  174. /**
  175. * A pair of input/output buffers that we use to clone writables.
  176. */
  177. private static class CopyInCopyOutBuffer {
  178. DataOutputBuffer outBuffer = new DataOutputBuffer();
  179. DataInputBuffer inBuffer = new DataInputBuffer();
  180. /**
  181. * Move the data from the output buffer to the input buffer.
  182. */
  183. void moveData() {
  184. inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
  185. }
  186. }
  187. /**
  188. * Allocate a buffer for each thread that tries to clone objects.
  189. */
  190. private static ThreadLocal cloneBuffers = new ThreadLocal() {
  191. protected synchronized Object initialValue() {
  192. return new CopyInCopyOutBuffer();
  193. }
  194. };
  195. /**
  196. * Make a copy of a writable object using serialization to a buffer.
  197. * @param orig The object to copy
  198. * @return The copied object
  199. */
  200. public static Writable clone(Writable orig, JobConf conf) {
  201. try {
  202. Writable newInst = (Writable)ReflectionUtils.newInstance(orig.getClass(),
  203. conf);
  204. CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
  205. buffer.outBuffer.reset();
  206. orig.write(buffer.outBuffer);
  207. buffer.moveData();
  208. newInst.readFields(buffer.inBuffer);
  209. return newInst;
  210. } catch (IOException e) {
  211. throw new RuntimeException("Error writing/reading clone buffer", e);
  212. }
  213. }
  214. /**
  215. * Serializes an integer to a binary stream with zero-compressed encoding.
  216. * For -120 <= i <= 127, only one byte is used with the actual value.
  217. * For other values of i, the first byte value indicates whether the
  218. * integer is positive or negative, and the number of bytes that follow.
  219. * If the first byte value v is between -121 and -124, the following integer
  220. * is positive, with number of bytes that follow are -(v+120).
  221. * If the first byte value v is between -125 and -128, the following integer
  222. * is negative, with number of bytes that follow are -(v+124). Bytes are
  223. * stored in the high-non-zero-byte-first order.
  224. *
  225. * @param stream Binary output stream
  226. * @param i Integer to be serialized
  227. * @throws java.io.IOException
  228. */
  229. public static void writeVInt(DataOutput stream, int i) throws IOException {
  230. writeVLong(stream, i);
  231. }
  232. /**
  233. * Serializes a long to a binary stream with zero-compressed encoding.
  234. * For -112 <= i <= 127, only one byte is used with the actual value.
  235. * For other values of i, the first byte value indicates whether the
  236. * long is positive or negative, and the number of bytes that follow.
  237. * If the first byte value v is between -113 and -120, the following long
  238. * is positive, with number of bytes that follow are -(v+112).
  239. * If the first byte value v is between -121 and -128, the following long
  240. * is negative, with number of bytes that follow are -(v+120). Bytes are
  241. * stored in the high-non-zero-byte-first order.
  242. *
  243. * @param stream Binary output stream
  244. * @param i Long to be serialized
  245. * @throws java.io.IOException
  246. */
  247. public static void writeVLong(DataOutput stream, long i) throws IOException {
  248. if (i >= -112 && i <= 127) {
  249. stream.writeByte((byte)i);
  250. return;
  251. }
  252. int len = -112;
  253. if (i < 0) {
  254. i ^= -1L; // take one's complement'
  255. len = -120;
  256. }
  257. long tmp = i;
  258. while (tmp != 0) {
  259. tmp = tmp >> 8;
  260. len--;
  261. }
  262. stream.writeByte((byte)len);
  263. len = (len < -120) ? -(len + 120) : -(len + 112);
  264. for (int idx = len; idx != 0; idx--) {
  265. int shiftbits = (idx - 1) * 8;
  266. long mask = 0xFFL << shiftbits;
  267. stream.writeByte((byte)((i & mask) >> shiftbits));
  268. }
  269. }
  270. /**
  271. * Reads a zero-compressed encoded long from input stream and returns it.
  272. * @param stream Binary input stream
  273. * @throws java.io.IOException
  274. * @return deserialized long from stream.
  275. */
  276. public static long readVLong(DataInput stream) throws IOException {
  277. int len = stream.readByte();
  278. if (len >= -112) {
  279. return len;
  280. }
  281. boolean isNegative = (len < -120);
  282. len = isNegative ? -(len + 120) : -(len + 112);
  283. long i = 0;
  284. for (int idx = 0; idx < len; idx++) {
  285. byte b = stream.readByte();
  286. i = i << 8;
  287. i = i | (b & 0xFF);
  288. }
  289. return (isNegative ? (i ^ -1L) : i);
  290. }
  291. /**
  292. * Reads a zero-compressed encoded integer from input stream and returns it.
  293. * @param stream Binary input stream
  294. * @throws java.io.IOException
  295. * @return deserialized integer from stream.
  296. */
  297. public static int readVInt(DataInput stream) throws IOException {
  298. return (int) readVLong(stream);
  299. }
  300. /**
  301. * Get the encoded length if an integer is stored in a variable-length format
  302. * @return the encoded length
  303. */
  304. public static int getVIntSize(long i) {
  305. if (i >= -112 && i <= 127) {
  306. return 1;
  307. }
  308. int len = -112;
  309. if (i < 0) {
  310. i ^= -1L; // take one's complement'
  311. len = -120;
  312. }
  313. long tmp = i;
  314. while (tmp != 0) {
  315. tmp = tmp >> 8;
  316. len--;
  317. }
  318. len = (len < -120) ? -(len + 120) : -(len + 112);
  319. return len+1;
  320. }
  321. /**
  322. * Read an Enum value from DataInput, Enums are read and written
  323. * using String values.
  324. * @param <T> Enum type
  325. * @param in DataInput to read from
  326. * @param enumType Class type of Enum
  327. * @return Enum represented by String read from DataInput
  328. * @throws IOException
  329. */
  330. public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType)
  331. throws IOException{
  332. return T.valueOf(enumType, Text.readString(in));
  333. }
  334. /**
  335. * writes String value of enum to DataOutput.
  336. * @param out Dataoutput stream
  337. * @param enumVal enum value
  338. * @throws IOException
  339. */
  340. public static void writeEnum(DataOutput out, Enum enumVal)
  341. throws IOException{
  342. Text.writeString(out, enumVal.name());
  343. }
  344. }