Browse Source

HADOOP-1654. Add IOUtils. Contributed by Enis Soztutar.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@569012 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
09b42bbde6

+ 3 - 0
CHANGES.txt

@@ -88,6 +88,9 @@ Trunk (unreleased changes)
     HADOOP-1744.  Remove many uses of the deprecated UTF8 class from
     the HDFS namenode.  (Christophe Taton via cutting)
 
+    HADOOP-1654.  Add IOUtils class, containing generic io-related
+    utility methods.   (Enis Soztutar via cutting)
+
 
 Release 0.14.0 - 2007-08-17
 

+ 20 - 11
src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java

@@ -22,7 +22,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.retry.*;
@@ -133,6 +132,7 @@ class DNBlockUpgradeInfo {
   boolean offlineUpgrade;
   
   /** Returns string that has block id and the associated file */
+  @Override
   public String toString() {
     return block + " (filename: " +
            ( (crcInfo == null || crcInfo.fileName == null) ? 
@@ -186,7 +186,7 @@ class BlockCrcUpgradeUtils {
         DFSClient.BlockReader reader = DFSClient.BlockReader.newBlockReader
                     (dnSock, crcFile, blk.getBlockId(), offset, len, 
                      (int)Math.min(len, 4096));
-        FileUtil.readFully(reader, buf, bufOffset, (int)len);
+        IOUtils.readFully(reader, buf, bufOffset, (int)len);
         return;
       } catch (IOException ioe) {
         LOG.warn("Could not read " + blk + " from " + dn.getName() + " : " +
@@ -243,7 +243,7 @@ class BlockCrcUpgradeUtils {
       }
       in.readFully(buf, bufOffset, len);
     } finally {
-      FileUtil.closeStream(in);
+      IOUtils.closeStream(in);
     }
   }
   
@@ -353,12 +353,12 @@ class BlockCrcUpgradeUtils {
         in = new FileInputStream( metaFile );
         in.skip(7); //should be skipFully().
         byte[] storedChecksum = new byte[ crcBuf.length ];
-        FileUtil.readFully(in, storedChecksum, 0, storedChecksum.length);
+        IOUtils.readFully(in, storedChecksum, 0, storedChecksum.length);
         if ( !Arrays.equals(crcBuf, storedChecksum) ) {
           throw new IOException("CRC does not match");
         }
       } finally {
-        FileUtil.closeStream(in);
+        IOUtils.closeStream(in);
       }
       return;
     }
@@ -390,7 +390,7 @@ class BlockCrcUpgradeUtils {
                               metaFile);
       }
     } finally {
-      FileUtil.closeStream(out);
+      IOUtils.closeStream(out);
       if ( tmpBlockFile != null ) {
         tmpBlockFile.delete();
       }
@@ -475,7 +475,7 @@ class BlockCrcUpgradeUtils {
          * But we are not optimizing for this case.
          */
         if ( toRead > 0 ) {
-          FileUtil.readFully(in, blockBuf, 0, toRead);
+          IOUtils.readFully(in, blockBuf, 0, toRead);
         }
 
         if ( (toRead == 0 && bytesAfter.length > 0) || toRead >= verifyLen ) {
@@ -533,7 +533,7 @@ class BlockCrcUpgradeUtils {
       assert newCrcBuf.length == newCrcOffset : "something is wrong"; 
       return newCrcBuf;
     } finally {
-      FileUtil.closeStream(in);
+      IOUtils.closeStream(in);
     }
   }
 
@@ -811,7 +811,7 @@ class BlockCrcUpgradeUtils {
       
       while (totalRead < blockLen) {
         int toRead = Math.min((int)(blockLen - totalRead), bytesPerChecksum);
-        FileUtil.readFully(in, dataBuf, 0, toRead );
+        IOUtils.readFully(in, dataBuf, 0, toRead );
         
         checksum.update(dataBuf, 0, toRead);
         crcBufPos += checksum.writeValue(crcBuf, crcBufPos, true);
@@ -820,7 +820,7 @@ class BlockCrcUpgradeUtils {
         totalRead += toRead;
       }
     } finally {
-      FileUtil.closeStream(in);
+      IOUtils.closeStream(in);
     }
     
     writeCrcData(blockInfo, bytesPerChecksum, crcBuf);
@@ -902,7 +902,7 @@ class BlockCrcUpgradeUtils {
         }
       } while (false);
     } finally {
-      FileUtil.closeSocket( dnSock );
+      IOUtils.closeSocket( dnSock );
     }
     
     throw new IOException("Error while fetching CRC from replica on " +
@@ -1139,6 +1139,7 @@ class BlockCrcUpgradeUtils {
       this.errors = errors;
     }
 
+    @Override
     public void readFields(DataInput in) throws IOException {
       super.readFields(in);
       datanodeId.readFields(in);
@@ -1147,6 +1148,7 @@ class BlockCrcUpgradeUtils {
       errors = in.readInt();
     }
 
+    @Override
     public void write(DataOutput out) throws IOException {
       super.write(out);
       datanodeId.write(out);
@@ -1170,11 +1172,13 @@ class BlockCrcUpgradeUtils {
       block = blk;
     }
 
+    @Override
     public void readFields(DataInput in) throws IOException {
       super.readFields(in);
       block.readFields(in);
     }
 
+    @Override
     public void write(DataOutput out) throws IOException {
       super.write(out);
       block.write(out);
@@ -1194,11 +1198,13 @@ class BlockCrcUpgradeUtils {
       crcInfo = info;
     }
 
+    @Override
     public void readFields(DataInput in) throws IOException {
       super.readFields(in);
       crcInfo.readFields(in);
     }
 
+    @Override
     public void write(DataOutput out) throws IOException {
       super.write(out);
       crcInfo.write(out);
@@ -1305,6 +1311,7 @@ class BlockCrcUpgradeObjectDatanode extends UpgradeObjectDatanode {
       (short) Math.floor(blocksUpgraded*100.0/blocksToUpgrade);
   }
 
+  @Override
   public UpgradeCommand completeUpgrade() throws IOException {
     // return latest stats command.
     assert getUpgradeStatus() == 100;
@@ -1375,6 +1382,7 @@ class BlockCrcUpgradeObjectDatanode extends UpgradeObjectDatanode {
     }
   }
   
+  @Override
   void doUpgrade() throws IOException {
     doUpgradeInternal();
   }
@@ -1661,6 +1669,7 @@ class BlockCrcUpgradeObjectNamenode extends UpgradeObjectNamenode {
     return (short) Math.floor(avgDatanodeCompletionPct * 0.9);
   }
 
+  @Override
   public UpgradeCommand startUpgrade() throws IOException {
     
     assert monitorThread == null;

+ 26 - 3
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -79,6 +79,7 @@ class DFSClient implements FSConstants {
       clients.add(client);
     }
 
+    @Override
     public synchronized void run() {
       for (DFSClient client : clients) {
         if (client.running) {
@@ -591,6 +592,7 @@ class DFSClient implements FSConstants {
      * because it first reads the data to user buffer and then checks
      * the checksum.
      */
+    @Override
     public synchronized int read(byte[] buf, int off, int len) 
                                  throws IOException {
       
@@ -610,6 +612,7 @@ class DFSClient implements FSConstants {
       return super.read(buf, off, len);
     }
 
+    @Override
     public synchronized long skip(long n) throws IOException {
       /* How can we make sure we don't throw a ChecksumException, at least
        * in majority of the cases?. This one throws. */  
@@ -629,11 +632,13 @@ class DFSClient implements FSConstants {
       return nSkipped;
     }
 
+    @Override
     public int read() throws IOException {
       throw new IOException("read() is not expected to be invoked. " +
                             "Use read(buf, off, len) instead.");
     }
     
+    @Override
     public boolean seekToNewSource(long targetPos) throws IOException {
       /* Checksum errors are handled outside the BlockReader. 
        * DFSInputStream does not always call 'seekToNewSource'. In the 
@@ -642,15 +647,18 @@ class DFSClient implements FSConstants {
       return false;
     }
     
+    @Override
     public void seek(long pos) throws IOException {
       throw new IOException("Seek() is not supported in BlockInputChecker");
     }
 
+    @Override
     protected long getChunkPosition(long pos) {
       throw new RuntimeException("getChunkPosition() is not supported, " +
                                  "since seek is not required");
     }
     
+    @Override
     protected synchronized int readChunk(long pos, byte[] buf, int offset, 
                                          int len, byte[] checksumBuf) 
                                          throws IOException {
@@ -690,11 +698,11 @@ class DFSClient implements FSConstants {
 
       if ( chunkLen > 0 ) {
         // len should be >= chunkLen
-        FileUtil.readFully(in, buf, offset, chunkLen);
+        IOUtils.readFully(in, buf, offset, chunkLen);
       }
       
       if ( checksumSize > 0 ) {
-        FileUtil.readFully(in, checksumBuf, 0, checksumSize);
+        IOUtils.readFully(in, checksumBuf, 0, checksumSize);
       }
 
       lastChunkOffset = chunkOffset;
@@ -773,6 +781,7 @@ class DFSClient implements FSConstants {
                               startOffset, firstChunkOffset );
     }
 
+    @Override
     public synchronized void close() throws IOException {
       startOffset = -1;
       checksum = null;
@@ -1005,6 +1014,7 @@ class DFSClient implements FSConstants {
     /**
      * Close it down!
      */
+    @Override
     public synchronized void close() throws IOException {
       checkOpen();
       if (closed) {
@@ -1024,6 +1034,7 @@ class DFSClient implements FSConstants {
       closed = true;
     }
 
+    @Override
     public synchronized int read() throws IOException {
       int ret = read( oneByteBuf, 0, 1 );
       return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
@@ -1062,6 +1073,7 @@ class DFSClient implements FSConstants {
     /**
      * Read the entire buffer.
      */
+    @Override
     public synchronized int read(byte buf[], int off, int len) throws IOException {
       checkOpen();
       if (closed) {
@@ -1197,6 +1209,7 @@ class DFSClient implements FSConstants {
      * 
      * @return actual number of bytes read
      */
+    @Override
     public int read(long position, byte[] buffer, int offset, int length)
       throws IOException {
       // sanity checks
@@ -1230,6 +1243,7 @@ class DFSClient implements FSConstants {
       return realLen;
     }
      
+    @Override
     public long skip(long n) throws IOException {
       if ( n > 0 ) {
         long curPos = getPos();
@@ -1246,6 +1260,7 @@ class DFSClient implements FSConstants {
     /**
      * Seek to a new arbitrary location
      */
+    @Override
     public synchronized void seek(long targetPos) throws IOException {
       if (targetPos > getFileLength()) {
         throw new IOException("Cannot seek after EOF");
@@ -1276,6 +1291,7 @@ class DFSClient implements FSConstants {
      * a node other than the current node is found, then returns true. 
      * If another node could not be found, then returns false.
      */
+    @Override
     public synchronized boolean seekToNewSource(long targetPos) throws IOException {
       boolean markedDead = deadNodes.containsKey(currentNode);
       addToDeadNodes(currentNode);
@@ -1296,12 +1312,14 @@ class DFSClient implements FSConstants {
         
     /**
      */
+    @Override
     public synchronized long getPos() throws IOException {
       return pos;
     }
 
     /**
      */
+    @Override
     public synchronized int available() throws IOException {
       if (closed) {
         throw new IOException("Stream closed");
@@ -1312,11 +1330,14 @@ class DFSClient implements FSConstants {
     /**
      * We definitely don't support marks
      */
+    @Override
     public boolean markSupported() {
       return false;
     }
+    @Override
     public void mark(int readLimit) {
     }
+    @Override
     public void reset() throws IOException {
       throw new IOException("Mark/reset not supported");
     }
@@ -1537,6 +1558,7 @@ class DFSClient implements FSConstants {
     }
 
     // @see FSOutputSummer#writeChunk()
+    @Override
     protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
                                                           throws IOException {
       checkOpen();
@@ -1599,7 +1621,7 @@ class DFSClient implements FSConstants {
           while ( bytesLeft >= 0 ) {
             int len = (int) Math.min( bytesLeft, bytesPerChecksum );
             if ( len > 0 ) {
-              FileUtil.readFully( in, buf, 0, len + checksumSize);
+              IOUtils.readFully( in, buf, 0, len + checksumSize);
             }
 
             blockStream.writeInt( len );
@@ -1680,6 +1702,7 @@ class DFSClient implements FSConstants {
      * Closes this output stream and releases any system 
      * resources associated with this stream.
      */
+    @Override
     public synchronized void close() throws IOException {
       checkOpen();
       if (closed) {

+ 13 - 11
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.dfs;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -32,8 +33,6 @@ import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.dfs.BlockCommand;
 import org.apache.hadoop.dfs.DatanodeProtocol;
-import org.apache.hadoop.fs.FileUtil;
-
 import java.io.*;
 import java.net.*;
 import java.util.*;
@@ -449,6 +448,7 @@ public class DataNode implements FSConstants, Runnable {
     Count(int init) { value = init; }
     synchronized void incr() { value++; }
     synchronized void decr() { value--; }
+    @Override
     public String toString() { return Integer.toString(value); }
     public int getValue() { return value; }
   }
@@ -1063,7 +1063,7 @@ public class DataNode implements FSConstants, Runnable {
         }
 
         byte [] buf = new byte[(int)fileSize];
-        FileUtil.readFully(checksumIn, buf, 0, buf.length);
+        IOUtils.readFully(checksumIn, buf, 0, buf.length);
         
         out = new DataOutputStream(s.getOutputStream());
         
@@ -1074,7 +1074,7 @@ public class DataNode implements FSConstants, Runnable {
         //last DATA_CHUNK
         out.writeInt(0);
       } finally {
-        FileUtil.closeStream(checksumIn);
+        IOUtils.closeStream(checksumIn);
       }
     }
   }
@@ -1171,7 +1171,7 @@ public class DataNode implements FSConstants, Runnable {
         blockInFile.seek(offset);
         if (checksumSkip > 0) {
           //Should we use seek() for checksum file as well?
-          FileUtil.skipFully(checksumIn, checksumSkip);
+          IOUtils.skipFully(checksumIn, checksumSkip);
         }
       }
       
@@ -1215,7 +1215,7 @@ public class DataNode implements FSConstants, Runnable {
               LOG.warn( " Could not read checksum for data at offset " +
                         offset + " for block " + block + " got : " + 
                         StringUtils.stringifyException(e) );
-              FileUtil.closeStream( checksumIn );
+              IOUtils.closeStream( checksumIn );
               checksumIn = null;
               if ( corruptChecksumOk ) {
                 // Just fill the array with zeros.
@@ -1238,10 +1238,10 @@ public class DataNode implements FSConstants, Runnable {
         offset += len;
       }
     } finally {
-      FileUtil.closeStream( blockInFile );
-      FileUtil.closeStream( checksumIn );
-      FileUtil.closeStream( blockIn );
-      FileUtil.closeStream( out );
+      IOUtils.closeStream( blockInFile );
+      IOUtils.closeStream( checksumIn );
+      IOUtils.closeStream( blockIn );
+      IOUtils.closeStream( out );
     }
     
     return totalRead;
@@ -1285,7 +1285,7 @@ public class DataNode implements FSConstants, Runnable {
                   targets[0].getName() + " got " + 
                   StringUtils.stringifyException( ie ) );
       } finally {
-        FileUtil.closeSocket(sock);
+        IOUtils.closeSocket(sock);
         xmitsInProgress--;
       }
     }
@@ -1393,6 +1393,7 @@ public class DataNode implements FSConstants, Runnable {
     return null;
   }
 
+  @Override
   public String toString() {
     return "DataNode{" +
       "data=" + data +
@@ -1465,6 +1466,7 @@ public class DataNode implements FSConstants, Runnable {
 
     // read & log any error messages from the running script
     Thread errThread = new Thread() {
+        @Override
         public void start() {
           try {
             String errLine = errR.readLine();

+ 6 - 86
src/java/org/apache/hadoop/fs/FileUtil.java

@@ -22,9 +22,8 @@ import java.io.*;
 import java.util.Enumeration;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
-import java.net.Socket;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -137,12 +136,8 @@ public class FileUtil {
       }
     } else if (srcFS.isFile(src)) {
       InputStream in = srcFS.open(src);
-      try {
-        OutputStream out = dstFS.create(dst, overwrite);
-        copyContent(in, out, conf);
-      } finally {
-        in.close();
-      }
+      OutputStream out = dstFS.create(dst, overwrite);
+      IOUtils.copyBytes(in, out, conf, true);
     } else {
       throw new IOException(src.toString() + ": No such file or directory");
     }
@@ -172,7 +167,7 @@ public class FileUtil {
         if (srcFS.isFile(contents[i])) {
           InputStream in = srcFS.open(contents[i]);
           try {
-            copyContent(in, out, conf, false);
+            IOUtils.copyBytes(in, out, conf, false);
             if (addString!=null)
               out.write(addString.getBytes("UTF-8"));
                 
@@ -211,11 +206,7 @@ public class FileUtil {
       }
     } else if (src.isFile()) {
       InputStream in = new FileInputStream(src);
-      try {
-        copyContent(in, dstFS.create(dst), conf);
-      } finally {
-        in.close();
-      } 
+      IOUtils.copyBytes(in, dstFS.create(dst), conf);
     }
     if (deleteSource) {
       return FileUtil.fullyDelete(src);
@@ -239,11 +230,7 @@ public class FileUtil {
       }
     } else if (srcFS.isFile(src)) {
       InputStream in = srcFS.open(src);
-      try {
-        copyContent(in, new FileOutputStream(dst), conf);
-      } finally {
-        in.close();
-      } 
+      IOUtils.copyBytes(in, new FileOutputStream(dst), conf);
     }
     if (deleteSource) {
       return srcFS.delete(src);
@@ -252,27 +239,6 @@ public class FileUtil {
     }
   }
 
-  private static void copyContent(InputStream in, OutputStream out,
-                                  Configuration conf) throws IOException {
-    copyContent(in, out, conf, true);
-  }
-
-  
-  private static void copyContent(InputStream in, OutputStream out,
-                                  Configuration conf, boolean close) throws IOException {
-    byte buf[] = new byte[conf.getInt("io.file.buffer.size", 4096)];
-    try {
-      int bytesRead = in.read(buf);
-      while (bytesRead >= 0) {
-        out.write(buf, 0, bytesRead);
-        bytesRead = in.read(buf);
-      }
-    } finally {
-      if (close)
-        out.close();
-    }
-  }
-
   private static Path checkDest(String srcName, FileSystem dstFS, Path dst)
     throws IOException {
     if (dstFS.exists(dst)) {
@@ -520,50 +486,4 @@ public class FileUtil {
     }
     return tmp;
   }
-  
-  //XXX These functions should be in IO Utils rather than FileUtil
-  // Reads len bytes in a loop.
-  public static void readFully( InputStream in, byte buf[],
-                                int off, int len ) throws IOException {
-    int toRead = len;
-    while ( toRead > 0 ) {
-      int ret = in.read( buf, off, toRead );
-      if ( ret < 0 ) {
-        throw new IOException( "Premeture EOF from inputStream");
-      }
-      toRead -= ret;
-      off += ret;
-    }
-  }
-  
-  public static void skipFully( InputStream in, long len ) throws IOException {
-    long toSkip = len;
-    while ( toSkip > 0 ) {
-      long ret = in.skip( toSkip );
-      if ( ret < 0 ) {
-        throw new IOException( "Premeture EOF from inputStream");
-      }
-      toSkip -= ret;
-    }
-  }
-  
-  public static void closeSocket( Socket sock ) {
-    // avoids try { close() } dance
-    if ( sock != null ) {
-      try {
-       sock.close();
-      } catch ( IOException ignored ) {
-      }
-    }
-  }
-
-  public static void closeStream(Closeable closeable ) {
-    // avoids try { close() } dance
-    if ( closeable != null ) {
-      try {
-        closeable.close();
-      } catch ( IOException ignored ) {
-      }
-    }
-  }
 }

+ 8 - 25
src/java/org/apache/hadoop/fs/FsShell.java

@@ -17,13 +17,15 @@
  */
 package org.apache.hadoop.fs;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.text.DecimalFormat;
 import java.text.SimpleDateFormat;
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.Tool;
@@ -67,22 +69,7 @@ public class FsShell extends Configured implements Tool {
     }
   }
 
-  /**
-   * Copies from one stream to another.
-   */
-  private void copyBytes(InputStream in, OutputStream out) throws IOException {
-    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
-    byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];
-    int bytesRead = in.read(buf);
-    while (bytesRead >= 0) {
-      out.write(buf, 0, bytesRead);
-      if ((ps != null) && ps.checkError()) {
-        throw new IOException("Unable to write to output stream.");
-      }
-      bytesRead = in.read(buf);
-    }
-  }
-      
+  
   /**
    * Copies from stdin to the indicated file.
    */
@@ -95,8 +82,9 @@ public class FsShell extends Configured implements Tool {
     }
     FSDataOutputStream out = fs.create(dst); 
     try {
-      copyBytes(System.in, out);
-    } finally {
+      IOUtils.copyBytes(System.in, out, getConf(), false);
+    } 
+    finally {
       out.close();
     }
   }
@@ -109,12 +97,7 @@ public class FsShell extends Configured implements Tool {
       throw new IOException("Source must be a file.");
     }
     FSDataInputStream in = fs.open(src);
-    try {
-      copyBytes(in, System.out);
-    } finally {
-      in.close();
-    }
-
+    IOUtils.copyBytes(in, System.out, getConf(), true);
   }
 
   /**

+ 150 - 0
src/java/org/apache/hadoop/io/IOUtils.java

@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.*;
+import java.net.Socket;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An utility class for I/O related functionality. 
+ */
+public class IOUtils {
+
+  /**
+   * Copies from one stream to another.
+   * @param in InputStrem to read from
+   * @param out OutputStream to write to
+   * @param buffSize the size of the buffer 
+   * @param close whether or not close the InputStream and 
+   * OutputStream at the end. The streams are closed in the finally clause.  
+   */
+  public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
+    throws IOException {
+
+    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+    byte buf[] = new byte[buffSize];
+    try {
+      int bytesRead = in.read(buf);
+      while (bytesRead >= 0) {
+        out.write(buf, 0, bytesRead);
+        if ((ps != null) && ps.checkError()) {
+          throw new IOException("Unable to write to output stream.");
+        }
+        bytesRead = in.read(buf);
+      }
+    } finally {
+      if(close) {
+        out.close();
+        in.close();
+      }
+    }
+  }
+  
+  /**
+   * Copies from one stream to another. <strong>closes the input and output streams 
+   * at the end</strong>.
+   * @param in InputStrem to read from
+   * @param out OutputStream to write to
+   * @param conf the Configuration object 
+   */
+  public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
+    throws IOException {
+    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
+  }
+  
+  /**
+   * Copies from one stream to another.
+   * @param in InputStrem to read from
+   * @param out OutputStream to write to
+   * @param conf the Configuration object
+   * @param close whether or not close the InputStream and 
+   * OutputStream at the end. The streams are closed in the finally clause.
+   */
+  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
+    throws IOException {
+    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
+  }
+  
+  /** Reads len bytes in a loop.
+   * @param in The InputStream to read from
+   * @param buf The buffer to fill
+   * @param off offset from the buffer
+   * @param len the length of bytes to read
+   * @throws IOException if it could not read requested number of bytes 
+   * for any reason (including EOF)
+   */
+  public static void readFully( InputStream in, byte buf[],
+      int off, int len ) throws IOException {
+    int toRead = len;
+    while ( toRead > 0 ) {
+      int ret = in.read( buf, off, toRead );
+      if ( ret < 0 ) {
+        throw new IOException( "Premeture EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }
+  }
+  
+  /** Similar to readFully(). Skips bytes in a loop.
+   * @param in The InputStream to skip bytes from
+   * @param len number of bytes to skip.
+   * @throws IOException if it could not skip requested number of bytes 
+   * for any reason (including EOF)
+   */
+  public static void skipFully( InputStream in, long len ) throws IOException {
+    while ( len > 0 ) {
+      long ret = in.skip( len );
+      if ( ret < 0 ) {
+        throw new IOException( "Premeture EOF from inputStream");
+      }
+      len -= ret;
+    }
+  }
+  
+  /**
+   * Closes the stream ignoring {@link IOException} 
+   * @param stream the Stream to close
+   */
+  public static void closeStream( java.io.Closeable stream ) {
+    // avoids try { close() } dance
+    if ( stream != null ) {
+      try {
+        stream.close();
+      } catch ( IOException ignored ) {
+      }
+    }
+  }
+  
+  /**
+   * Closes the socket ignoring {@link IOException} 
+   * @param sock the Socket to close
+   */
+  public static void closeSocket( Socket sock ) {
+    // avoids try { close() } dance
+    if ( sock != null ) {
+      try {
+       sock.close();
+      } catch ( IOException ignored ) {
+      }
+    }
+  }
+}

+ 2 - 2
src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.dfs.DFSClient.DFSDataInputStream;
 import org.apache.commons.logging.Log;
@@ -95,7 +95,7 @@ public class TestDataTransferProtocol extends TestCase {
         assertEquals("checking byte[" + i + "]", recvBuf[i], retBuf[i]);
       }
     } finally {
-      FileUtil.closeSocket(sock);
+      IOUtils.closeSocket(sock);
     }
   }