瀏覽代碼

HDFS-4256 Backport concatenation of files into a single file to branch-1 (sanjay Radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1432185 13f79535-47bb-0310-9956-ffa450edef68
Sanjay Radia 12 年之前
父節點
當前提交
0f3c58a52d

+ 4 - 0
CHANGES.txt

@@ -61,6 +61,10 @@ Release 1.2.0 - unreleased
     NetworkTopology with NodeGroup and use generic code for choosing datanode
     NetworkTopology with NodeGroup and use generic code for choosing datanode
     in Balancer.  (Junping Du via szetszwo)
     in Balancer.  (Junping Du via szetszwo)
 
 
+
+    HDFS-4256 Backport concatenation of files into a single file to branch-1
+    (sanjay Radia)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

+ 13 - 0
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -944,6 +944,19 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                                      DSQuotaExceededException.class);
                                      DSQuotaExceededException.class);
     }
     }
   }
   }
+  
+  /**
+   * Move blocks from src to trg and delete src
+   * See {@link ClientProtocol#concat(String, String [])}. 
+   */
+  public void concat(String trg, String [] srcs) throws IOException {
+    checkOpen();
+    try {
+      namenode.concat(trg, srcs);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class);
+    }
+  }
 
 
   /**
   /**
    * Rename file or directory.
    * Rename file or directory.

+ 18 - 0
src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -219,6 +219,24 @@ public class DistributedFileSystem extends FileSystem {
     return dfs.setReplication(getPathName(src), replication);
     return dfs.setReplication(getPathName(src), replication);
   }
   }
 
 
+  /**
+   * THIS IS DFS only operations, it is not part of FileSystem
+   * move blocks from srcs to trg
+   * and delete srcs afterwards
+   * all blocks should be the same size
+   * @param trg existing file to append to
+   * @param psrcs list of files (same block size, same replication)
+   * @throws IOException
+   */
+  public void concat(Path trg, Path [] psrcs) throws IOException {
+    String [] srcs = new String [psrcs.length];
+    for(int i=0; i<psrcs.length; i++) {
+      srcs[i] = getPathName(psrcs[i]);
+    }
+    statistics.incrementWriteOps(1);
+    dfs.concat(getPathName(trg), srcs);
+  }
+
   /**
   /**
    * Rename files/dirs
    * Rename files/dirs
    */
    */

+ 12 - 0
src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -54,6 +54,8 @@ public interface ClientProtocol extends VersionedProtocol {
    *     multiple blocks within a single BlockTokenIdentifier 
    *     multiple blocks within a single BlockTokenIdentifier 
    *     
    *     
    *     (bumped to 61 to bring in line with trunk)
    *     (bumped to 61 to bring in line with trunk)
+   * Added concat() - since this an addition of method, it does not break
+   * compatibility and version number does not need to be changed.
    */
    */
   public static final long versionID = 61L;
   public static final long versionID = 61L;
   
   
@@ -266,6 +268,16 @@ public interface ClientProtocol extends VersionedProtocol {
    *                                any quota restriction
    *                                any quota restriction
    */
    */
   public boolean rename(String src, String dst) throws IOException;
   public boolean rename(String src, String dst) throws IOException;
+  
+  /**
+   * Moves blocks from srcs to trg and delete srcs
+   * 
+   * @param trg existing file
+   * @param srcs - list of existing files (same block size, same replication)
+   * @throws IOException if some arguments are invalid
+   */
+  public void concat(String trg, String[] srcs) 
+      throws IOException;
 
 
   /**
   /**
    * Delete the given file or directory from the file system.
    * Delete the given file or directory from the file system.

+ 4 - 2
src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java

@@ -78,8 +78,10 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -32;
-  // Current version: 
+  public static final int LAYOUT_VERSION = -41;
+  // Previous versions:
   // -32: to handle editlog opcode conflicts with 0.20.203 during upgrades and
   // -32: to handle editlog opcode conflicts with 0.20.203 during upgrades and
   // to disallow upgrade to release 0.21.
   // to disallow upgrade to release 0.21.
+  // Current version: 
+  // -41 added OP_CONCAT_DELETE
 }
 }

+ 80 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -585,7 +585,68 @@ class FSDirectory implements FSConstants, Closeable {
       }
       }
     }
     }
   }
   }
+
+  /**
+   * Concat - see {@link #unprotectedConcat(String, String[], long)}
+   */
+  public void concat(String target, String [] srcs) throws IOException{
+    synchronized(rootDir) {
+      // actual move
+      waitForReady();
+      long timestamp = FSNamesystem.now();
+      unprotectedConcat(target, srcs, timestamp);
+      // do the commit
+      fsImage.getEditLog().logConcat(target, srcs, timestamp);
+    }
+  }
+  
+
+  
+  /**
+   * Moves all the blocks from {@code srcs} in the order of {@code srcs} array
+   * and appends them to {@code target}.
+   * The {@code srcs} files are deleted.
+   * @param target file to move the blocks to
+   * @param srcs list of file to move the blocks from
+   * Must be public because also called from EditLogs
+   * NOTE: - it does not update quota since concat is restricted to same dir.
+   */
+  public void unprotectedConcat(String target, String [] srcs, long timestamp) {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+    }
+    // do the move
+    
+    final INode[] trgINodes =  getExistingPathINodes(target);
+    INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
+    INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
     
     
+    INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+    int i = 0;
+    int totalBlocks = 0;
+    for(String src : srcs) {
+      INodeFile srcInode = getFileINode(src);
+      allSrcInodes[i++] = srcInode;
+      totalBlocks += srcInode.blocks.length;  
+    }
+    trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
+    
+    // since we are in the same dir - we can use same parent to remove files
+    int count = 0;
+    for(INodeFile nodeToRemove: allSrcInodes) {
+      if(nodeToRemove == null) continue;
+      
+      nodeToRemove.blocks = null;
+      trgParent.removeChild(nodeToRemove);
+      count++;
+    }
+
+    trgInode.setModificationTime(timestamp);
+    trgParent.setModificationTime(timestamp);
+    // update quota on the parent directory ('count' files removed, 0 space)
+    unprotectedUpdateCount(trgINodes, trgINodes.length-1, -count, 0);
+  }
+
   /**
   /**
    * Delete the target directory and collect the blocks under it
    * Delete the target directory and collect the blocks under it
    * 
    * 
@@ -913,6 +974,25 @@ class FSDirectory implements FSConstants, Closeable {
     }
     }
   }
   }
   
   
+  /**
+   * updates quota without verification
+   * callers responsibility is to make sure quota is not exceeded
+   * @param inodes
+   * @param numOfINodes
+   * @param nsDelta
+   * @param dsDelta
+   */
+   void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
+                                      long nsDelta, long dsDelta) {
+    for(int i=0; i < numOfINodes; i++) {
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+        node.addSpaceConsumed(nsDelta, dsDelta);
+      }
+    }
+  }
+   
+  
   /** Return the name of the path represented by inodes at [0, pos] */
   /** Return the name of the path represented by inodes at [0, pos] */
   private static String getFullPathName(INode[] inodes, int pos) {
   private static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
     StringBuilder fullPathName = new StringBuilder();

+ 42 - 3
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -85,6 +85,7 @@ public class FSEditLog {
   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+  private static final byte OP_CONCAT_DELETE = 16; // concat files.
   private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
   private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
   private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
   private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
   private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
   private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
@@ -631,7 +632,8 @@ public class FSEditLog {
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
         numOpTimes = 0, numOpGetDelegationToken = 0,
         numOpTimes = 0, numOpGetDelegationToken = 0,
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0,
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0,
-        numOpUpdateMasterKey = 0, numOpOther = 0;
+        numOpUpdateMasterKey = 0, numOpOther = 0,
+        numOpConcatDelete = 0;
     long highestGenStamp = -1;
     long highestGenStamp = -1;
     long startTime = FSNamesystem.now();
     long startTime = FSNamesystem.now();
 
 
@@ -807,13 +809,34 @@ public class FSEditLog {
           short replication = adjustReplication(readShort(in));
           short replication = adjustReplication(readShort(in));
           fsDir.unprotectedSetReplication(path, replication, null);
           fsDir.unprotectedSetReplication(path, replication, null);
           break;
           break;
-        } 
+        }
+        case OP_CONCAT_DELETE: {
+          if (logVersion > -22) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpConcatDelete++;
+          int length = in.readInt();
+          if (length < 3) { // trg, srcs.., timestam
+            throw new IOException("Incorrect data format. " 
+                                  + "ConcatDelete operation.");
+          }
+          String trg = FSImage.readString(in);
+          int srcSize = length - 1 - 1; //trg and timestamp
+          String [] srcs = new String [srcSize];
+          for(int i=0; i<srcSize;i++) {
+            srcs[i]= FSImage.readString(in);
+          }
+          timestamp = readLong(in);
+          fsDir.unprotectedConcat(trg, srcs, timestamp);
+          break;
+        }
         case OP_RENAME: {
         case OP_RENAME: {
           numOpRename++;
           numOpRename++;
           int length = in.readInt();
           int length = in.readInt();
           if (length != 3) {
           if (length != 3) {
             throw new IOException("Incorrect data format. " 
             throw new IOException("Incorrect data format. " 
-                                  + "Mkdir operation.");
+                                  + "Rename operation.");
           }
           }
           String s = FSImage.readString(in);
           String s = FSImage.readString(in);
           String d = FSImage.readString(in);
           String d = FSImage.readString(in);
@@ -1065,6 +1088,7 @@ public class FSEditLog {
           + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
           + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
           + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
           + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
           + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
           + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
+          + " numOpConcatDelete  = " + numOpConcatDelete
           + " numOpOther = " + numOpOther);
           + " numOpOther = " + numOpOther);
     }
     }
 
 
@@ -1350,6 +1374,21 @@ public class FSEditLog {
     logEdit(OP_SET_OWNER, new UTF8(src), u, g);
     logEdit(OP_SET_OWNER, new UTF8(src), u, g);
   }
   }
 
 
+  /**
+   * concat(trg,src..) log
+   */
+  void logConcat(String trg, String [] srcs, long timestamp) {
+    int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+    UTF8 info[] = new UTF8[size];
+    int idx = 0;
+    info[idx++] = new UTF8(trg);
+    for(int i=0; i<srcs.length; i++) {
+      info[idx++] = new UTF8(srcs[i]);
+    }
+    info[idx] = FSEditLog.toLogLong(timestamp);
+    logEdit(OP_CONCAT_DELETE, new ArrayWritable(UTF8.class, info));
+  }
+  
   /** 
   /** 
    * Add delete file record to edit log
    * Add delete file record to edit log
    */
    */

+ 158 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1197,6 +1197,164 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     
     
     return inode.createLocatedBlocks(results);
     return inode.createLocatedBlocks(results);
   }
   }
+  
+
+  /**
+   * Moves all the blocks from {@code srcs} in the order of {@code srcs} array
+   * and appends them to {@code target}.
+   * The {@code srcs} files are deleted.
+   * To avoid rollbacks we will verify validitity of ALL of the args
+   * before we start actual move.
+   * @param target file to move the blocks to
+   * @param srcs list of file to move the blocks from
+   * @throws IOException
+   */
+  void concat(String target, String [] srcs) 
+      throws IOException {
+    if(FSNamesystem.LOG.isDebugEnabled()) {
+      FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
+          " to " + target);
+    }
+    
+    // verify args
+    if(target.isEmpty()) {
+      throw new IllegalArgumentException("Target file name is empty");
+    }
+    if(srcs == null || srcs.length == 0) {
+      throw new IllegalArgumentException("No sources given");
+    }
+    
+    // We require all files be in the same directory
+    String trgParent = 
+      target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
+    for (String s : srcs) {
+      String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
+      if (!srcParent.equals(trgParent)) {
+        throw new IllegalArgumentException(
+           "Sources and target are not in the same directory");
+      }
+    }
+
+    HdfsFileStatus resultingStat = null;
+    synchronized(this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot concat " + target, safeMode);
+      }
+      concatInternal(target, srcs);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(target);
+      }
+    }
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getLoginUser(),
+                    Server.getRemoteIp(),
+                    "concat", Arrays.toString(srcs), target, resultingStat);
+    }
+  }
+
+  /** See {@link #concat(String, String[])} */
+  private void concatInternal(String target, String [] srcs) 
+      throws IOException {
+
+    // write permission for the target
+    if (isPermissionEnabled) {
+      checkPathAccess(target, FsAction.WRITE);
+
+      // and srcs
+      for(String aSrc: srcs) {
+        checkPathAccess(aSrc, FsAction.READ); // read the file
+        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+      }
+    }
+
+    // to make sure no two files are the same
+    Set<INode> si = new HashSet<INode>();
+
+    // we put the following prerequisite for the operation
+    // replication and blocks sizes should be the same for ALL the blocks
+
+    // check the target
+    final INodeFile trgInode = dir.getFileINode(target);
+    if(trgInode.isUnderConstruction()) {
+      throw new IllegalArgumentException("concat: target file "
+          + target + " is under construction");
+    }
+    // per design target shouldn't be empty and all the blocks same size
+    if(trgInode.blocks.length  == 0) {
+      throw new IllegalArgumentException("concat: target file "
+          + target + " is empty");
+    }
+
+    long blockSize = trgInode.getPreferredBlockSize();
+
+    // check the end block to be full
+    final BlockInfo last =  trgInode.blocks[trgInode.blocks.length-1];
+    if(blockSize != last.getNumBytes()) {
+      throw new IllegalArgumentException("The last block in " + target
+          + " is not full; last block size = " + last.getNumBytes()
+          + " but file block size = " + blockSize);
+    }
+
+    si.add(trgInode);
+    short repl = trgInode.getReplication();
+
+    // now check the srcs
+    boolean endSrc = false; // final src file doesn't have to have full end block
+    for(int i=0; i<srcs.length; i++) {
+      String src = srcs[i];
+      if(i==srcs.length-1)
+        endSrc=true;
+
+      final INodeFile srcInode = dir.getFileINode(src);
+      if(src.isEmpty() 
+          || srcInode.isUnderConstruction()
+          || srcInode.blocks.length == 0) {
+        throw new IllegalArgumentException("concat: source file " + src
+            + " is invalid or empty or underConstruction");
+      }
+
+      // check replication and blocks size
+      if(repl != srcInode.getReplication()) {
+        throw new IllegalArgumentException("concat: the source file "
+            + src + " and the target file " + target
+            + " should have the same replication: source replication is "
+            + srcInode.getReplication()
+            + " but target replication is " + repl);
+      }
+
+      //boolean endBlock=false;
+      // verify that all the blocks are of the same length as target
+      // should be enough to check the end blocks
+      final BlockInfo[] srcBlocks = srcInode.getBlocks();
+      int idx = srcBlocks.length-1;
+      if(endSrc)
+        idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
+      if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
+        throw new IllegalArgumentException("concat: the soruce file "
+            + src + " and the target file " + target
+            + " should have the same blocks sizes: target block size is "
+            + blockSize + " but the size of source block " + idx + " is "
+            + srcBlocks[idx].getNumBytes());
+      }
+
+      si.add(srcInode);
+    }
+
+    // make sure no two files are the same
+    if(si.size() < srcs.length+1) { // trg + srcs
+      // it means at least two files are the same
+      throw new IllegalArgumentException(
+          "concat: at least two of the source files are the same");
+    }
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
+          Arrays.toString(srcs) + " to " + target);
+    }
+
+    dir.concat(target,srcs);
+  }
 
 
   /**
   /**
    * stores the modification and access time for this inode. 
    * stores the modification and access time for this inode. 

+ 10 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java

@@ -88,7 +88,7 @@ class INodeDirectoryWithQuota extends INodeDirectory {
    * @param dsQuota diskspace quota to be set
    * @param dsQuota diskspace quota to be set
    *                                
    *                                
    */
    */
-  void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+  void setQuota(long newNsQuota, long newDsQuota) {
     nsQuota = newNsQuota;
     nsQuota = newNsQuota;
     dsQuota = newDsQuota;
     dsQuota = newDsQuota;
   }
   }
@@ -122,6 +122,15 @@ class INodeDirectoryWithQuota extends INodeDirectory {
     diskspace += dsDelta;
     diskspace += dsDelta;
   }
   }
   
   
+  /** Update the size of the tree
+   * 
+   * @param nsDelta the change of the tree size
+   * @param dsDelta change to disk space occupied
+   */
+  void addSpaceConsumed(long nsDelta, long dsDelta) {
+    setSpaceConsumed(nsCount + nsDelta, diskspace + dsDelta);
+  }
+  
   /** 
   /** 
    * Sets namespace and diskspace take by the directory rooted 
    * Sets namespace and diskspace take by the directory rooted 
    * at this INode. This should be used carefully. It does not check 
    * at this INode. This should be used carefully. It does not check 

+ 28 - 3
src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -111,6 +111,26 @@ class INodeFile extends INode {
     return this.blocks;
     return this.blocks;
   }
   }
 
 
+  /**
+   * append array of blocks to this.blocks
+   */
+  void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
+    int size = this.blocks.length;
+    
+    BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks];
+    System.arraycopy(this.blocks, 0, newlist, 0, size);
+    
+    for(INodeFile in: inodes) {
+      System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length);
+      size += in.blocks.length;
+    }
+    
+    for(BlockInfo bi: this.blocks) {
+      bi.setINode(this);
+    }
+    this.blocks = newlist;
+  }
+  
   /**
   /**
    * add a block to the block list
    * add a block to the block list
    */
    */
@@ -136,9 +156,11 @@ class INodeFile extends INode {
 
 
   int collectSubtreeBlocksAndClear(List<Block> v) {
   int collectSubtreeBlocksAndClear(List<Block> v) {
     parent = null;
     parent = null;
-    for (BlockInfo blk : blocks) {
-      v.add(blk);
-      blk.setINode(null);
+    if (blocks != null && v != null) {
+      for (BlockInfo blk : blocks) {
+        v.add(blk);
+        blk.setINode(null);
+      }
     }
     }
     blocks = null;
     blocks = null;
     return 1;
     return 1;
@@ -171,6 +193,9 @@ class INodeFile extends INode {
   
   
   long diskspaceConsumed(Block[] blkArr) {
   long diskspaceConsumed(Block[] blkArr) {
     long size = 0;
     long size = 0;
+    if(blkArr == null) 
+      return 0;
+    
     for (Block blk : blkArr) {
     for (Block blk : blkArr) {
       if (blk != null) {
       if (blk != null) {
         size += blk.getNumBytes();
         size += blk.getNumBytes();

+ 7 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -830,6 +830,13 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   public long getPreferredBlockSize(String filename) throws IOException {
   public long getPreferredBlockSize(String filename) throws IOException {
     return namesystem.getPreferredBlockSize(filename);
     return namesystem.getPreferredBlockSize(filename);
   }
   }
+  
+  /** 
+   * {@inheritDoc}
+   */
+  public void concat(String trg, String[] src) throws IOException {
+    namesystem.concat(trg, src);
+  }
     
     
   /**
   /**
    */
    */

+ 54 - 0
src/hdfs/org/apache/hadoop/hdfs/tools/HDFSConcat.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+
+public class HDFSConcat {
+  private final static String def_uri = "hdfs://localhost:9000";
+  /**
+   * @param args
+   */
+  public static void main(String... args) throws IOException {
+
+    if(args.length < 2) {
+      System.err.println("Usage HDFSConcat target srcs..");
+      System.exit(0);
+    }
+    
+    Configuration conf = new Configuration();
+    String uri = conf.get("fs.default.name", def_uri);
+    Path path = new Path(uri);
+    DistributedFileSystem dfs = 
+      (DistributedFileSystem)FileSystem.get(path.toUri(), conf);
+    
+    Path [] srcs = new Path[args.length-1];
+    for(int i=1; i<args.length; i++) {
+      srcs[i-1] = new Path(args[i]);
+    }
+    dfs.concat(new Path(args[0]), srcs);
+  }
+  
+}

+ 1 - 0
src/test/commit-tests

@@ -118,6 +118,7 @@
 **/TestGetBlocks.java
 **/TestGetBlocks.java
 **/TestHDFSServerPorts.java
 **/TestHDFSServerPorts.java
 **/TestHDFSTrash.java
 **/TestHDFSTrash.java
+**/TestHDFSConcat.java
 **/TestHeartbeatHandling.java
 **/TestHeartbeatHandling.java
 **/TestHost2NodesMap.java
 **/TestHost2NodesMap.java
 **/TestInterDatanodeProtocol.java
 **/TestInterDatanodeProtocol.java

+ 26 - 13
src/test/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -34,6 +34,7 @@ import java.util.Random;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -136,27 +137,39 @@ public class DFSTestUtil {
   
   
   public static void createFile(FileSystem fs, Path fileName, long fileLen, 
   public static void createFile(FileSystem fs, Path fileName, long fileLen, 
       short replFactor, long seed) throws IOException {
       short replFactor, long seed) throws IOException {
+    createFile(fs, fileName, 1024, fileLen, fs.getDefaultBlockSize(fileName),
+        replFactor, seed);
+  }
+  
+  public static void createFile(FileSystem fs, Path fileName, int bufferLen,
+      long fileLen, long blockSize, short replFactor, long seed)
+      throws IOException {
+    assert bufferLen > 0;
     if (!fs.mkdirs(fileName.getParent())) {
     if (!fs.mkdirs(fileName.getParent())) {
       throw new IOException("Mkdirs failed to create " + 
       throw new IOException("Mkdirs failed to create " + 
                             fileName.getParent().toString());
                             fileName.getParent().toString());
     }
     }
     FSDataOutputStream out = null;
     FSDataOutputStream out = null;
     try {
     try {
-      out = fs.create(fileName, replFactor);
-      byte[] toWrite = new byte[1024];
-      Random rb = new Random(seed);
-      long bytesToWrite = fileLen;
-      while (bytesToWrite>0) {
-        rb.nextBytes(toWrite);
-        int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
-
-        out.write(toWrite, 0, bytesToWriteNext);
-        bytesToWrite -= bytesToWriteNext;
+      out = fs.create(fileName, true,
+        fs.getConf().getInt("io.file.buffer.size", 4096),replFactor, blockSize);
+      if (fileLen > 0) {
+        byte[] toWrite = new byte[bufferLen];
+        Random rb = new Random(seed);
+        long bytesToWrite = fileLen;
+        while (bytesToWrite>0) {
+          rb.nextBytes(toWrite);
+          int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
+              : (int) bytesToWrite;
+  
+          out.write(toWrite, 0, bytesToWriteNext);
+          bytesToWrite -= bytesToWriteNext;
+        }
       }
       }
-      out.close();
-      out = null;
     } finally {
     } finally {
-      IOUtils.closeStream(out);
+      if (out != null) {
+        out.close();
+      }
     }
     }
   }
   }
   
   

+ 2 - 0
src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -270,6 +270,8 @@ public class TestDFSClientRetries extends TestCase {
     public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
     public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
 
 
     public boolean rename(String src, String dst) throws IOException { return false; }
     public boolean rename(String src, String dst) throws IOException { return false; }
+    
+    public void concat(String trg, String[] srcs)  throws IOException {}
 
 
     public boolean delete(String src) throws IOException { return false; }
     public boolean delete(String src) throws IOException { return false; }
 
 

+ 350 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java

@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHDFSConcat {
+  public static final Log LOG = LogFactory.getLog(TestHDFSConcat.class);
+
+  private static final short REPL_FACTOR = 2;
+  
+  private MiniDFSCluster cluster;
+  private NameNode nn;
+  private DistributedFileSystem dfs;
+
+  private static long blockSize = 512;
+
+  
+  private static Configuration conf;
+
+  static {
+    conf = new Configuration();
+    conf.setLong("dfs.block.size", blockSize);
+  }
+  
+  @Before
+  public void startUpCluster() throws IOException {
+    cluster = new MiniDFSCluster(conf, REPL_FACTOR, true, null);
+    assertNotNull("Failed Cluster Creation", cluster);
+    cluster.waitClusterUp();
+    dfs = (DistributedFileSystem) cluster.getFileSystem();
+    assertNotNull("Failed to get FileSystem", dfs);
+    nn = cluster.getNameNode();
+    assertNotNull("Failed to get NameNode", nn);
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    if(dfs != null) {
+      dfs.close();
+    }
+    if(cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+    }
+  } 
+
+  /**
+   * Concatenates 10 files into one
+   * Verifies the final size, deletion of the file, number of blocks
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  @Test
+  public void testConcat() throws IOException, InterruptedException {
+    final int numFiles = 10;
+    long fileLen = blockSize*3;
+    HdfsFileStatus fStatus;
+    FSDataInputStream stm;
+    
+    String trg = new String("/trg");
+    Path trgPath = new Path(trg);
+    DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
+    fStatus  = nn.getFileInfo(trg);
+    long trgLen = fStatus.getLen();
+    long trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+       
+    Path [] files = new Path[numFiles];
+    byte [] [] bytes = new byte [numFiles][(int)fileLen];
+    LocatedBlocks [] lblocks = new LocatedBlocks[numFiles];
+    long [] lens = new long [numFiles];
+    
+    
+    int i = 0;
+    for(i=0; i<files.length; i++) {
+      files[i] = new Path("/file"+i);
+      Path path = files[i];
+      System.out.println("Creating file " + path);
+      DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, 1);
+    
+      fStatus = nn.getFileInfo(path.toUri().getPath());
+      lens[i] = fStatus.getLen();
+      assertEquals(trgLen, lens[i]); // file of the same length.
+      
+      lblocks[i] = nn.getBlockLocations(path.toUri().getPath(), 0, lens[i]);
+      
+      //read the file
+      stm = dfs.open(path);
+      stm.readFully(0, bytes[i]);
+      //bytes[i][10] = 10;
+      stm.close();
+    }
+    
+    // check permissions -try the operation with the "wrong" user
+    final UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
+        "theDoctor", new String[] { "tardis" });
+    DistributedFileSystem hdfs = 
+        (DistributedFileSystem)DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      hdfs.concat(trgPath, files);
+      fail("Permission exception expected");
+    } catch (IOException ie) {
+      System.out.println("Got expected exception for permissions:"
+          + ie.getLocalizedMessage());
+      // expected
+    }
+    
+    // check count update
+    ContentSummary cBefore = dfs.getContentSummary(trgPath.getParent());
+    
+    // now concatenate
+    dfs.concat(trgPath, files);
+    
+    // verify  count
+    ContentSummary cAfter = dfs.getContentSummary(trgPath.getParent());
+    assertEquals(cBefore.getFileCount(), cAfter.getFileCount()+files.length);
+    
+    // verify other stuff
+    long totalLen = trgLen;
+    long totalBlocks = trgBlocks;
+    for(i=0; i<files.length; i++) {
+      totalLen += lens[i];
+      totalBlocks += lblocks[i].locatedBlockCount();
+    }
+    System.out.println("total len=" + totalLen + "; totalBlocks=" + totalBlocks);
+    
+    
+    fStatus = nn.getFileInfo(trg);
+    trgLen  = fStatus.getLen(); // new length
+    
+    // read the resulting file
+    stm = dfs.open(trgPath);
+    byte[] byteFileConcat = new byte[(int)trgLen];
+    stm.readFully(0, byteFileConcat);
+    stm.close();
+    
+    trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+    
+    //verifications
+    // 1. number of blocks
+    assertEquals(trgBlocks, totalBlocks); 
+        
+    // 2. file lengths
+    assertEquals(trgLen, totalLen);
+    
+    // 3. removal of the src file
+    for(Path p: files) {
+      fStatus = nn.getFileInfo(p.toUri().getPath());
+      assertNull("File " + p + " still exists", fStatus); // file shouldn't exist
+      // try to create fie with the same name
+      DFSTestUtil.createFile(dfs, p, fileLen, REPL_FACTOR, 1); 
+    }
+  
+    // 4. content
+    checkFileContent(byteFileConcat, bytes);
+    
+    // add a small file (less then a block)
+    Path smallFile = new Path("/sfile");
+    int sFileLen = 10;
+    DFSTestUtil.createFile(dfs, smallFile, sFileLen, REPL_FACTOR, 1);
+    dfs.concat(trgPath, new Path [] {smallFile});
+    
+    fStatus = nn.getFileInfo(trg);
+    trgLen  = fStatus.getLen(); // new length
+    
+    // check number of blocks
+    trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
+    assertEquals(trgBlocks, totalBlocks+1);
+    
+    // and length
+    assertEquals(trgLen, totalLen+sFileLen);
+    
+  }
+
+  // compare content
+  private void checkFileContent(byte[] concat, byte[][] bytes ) {
+    int idx=0;
+    boolean mismatch = false;
+    
+    for(byte [] bb: bytes) {
+      for(byte b: bb) {
+        if(b != concat[idx++]) {
+          mismatch=true;
+          break;
+        }
+      }
+      if(mismatch)
+        break;
+    }
+    assertFalse("File content of concatenated file is different", mismatch);
+  }
+
+
+  // test case when final block is not of a full length
+  @Test
+  public void testConcatNotCompleteBlock() throws IOException {
+    long trgFileLen = blockSize*3;
+    long srcFileLen = blockSize*3+20; // block at the end - not full
+
+    
+    // create first file
+    String name1="/trg", name2="/src";
+    Path filePath1 = new Path(name1);
+    DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1);
+    
+    HdfsFileStatus fStatus = cluster.getNameNode().getFileInfo(name1);
+    long fileLen = fStatus.getLen();
+    assertEquals(fileLen, trgFileLen);
+    
+    //read the file
+    FSDataInputStream stm = dfs.open(filePath1);
+    byte[] byteFile1 = new byte[(int)trgFileLen];
+    stm.readFully(0, byteFile1);
+    stm.close();
+    
+    LocatedBlocks lb1 = cluster.getNameNode().getBlockLocations(name1, 0, trgFileLen);
+    
+    Path filePath2 = new Path(name2);
+    DFSTestUtil.createFile(dfs, filePath2, srcFileLen, REPL_FACTOR, 1);
+    fStatus = cluster.getNameNode().getFileInfo(name2);
+    fileLen = fStatus.getLen();
+    assertEquals(srcFileLen, fileLen);
+    
+    // read the file
+    stm = dfs.open(filePath2);
+    byte[] byteFile2 = new byte[(int)srcFileLen];
+    stm.readFully(0, byteFile2);
+    stm.close();
+    
+    LocatedBlocks lb2 = cluster.getNameNode().getBlockLocations(name2, 0, srcFileLen);
+    
+    
+    System.out.println("trg len="+trgFileLen+"; src len="+srcFileLen);
+    
+    // move the blocks
+    dfs.concat(filePath1, new Path [] {filePath2});
+    
+    long totalLen = trgFileLen + srcFileLen;
+    fStatus = cluster.getNameNode().getFileInfo(name1);
+    fileLen = fStatus.getLen();
+    
+    // read the resulting file
+    stm = dfs.open(filePath1);
+    byte[] byteFileConcat = new byte[(int)fileLen];
+    stm.readFully(0, byteFileConcat);
+    stm.close();
+    
+    LocatedBlocks lbConcat = cluster.getNameNode().getBlockLocations(name1, 0, fileLen);
+    
+    //verifications
+    // 1. number of blocks
+    assertEquals(lbConcat.locatedBlockCount(), 
+        lb1.locatedBlockCount() + lb2.locatedBlockCount());
+    
+    // 2. file lengths
+    System.out.println("file1 len="+fileLen+"; total len="+totalLen);
+    assertEquals(fileLen, totalLen);
+    
+    // 3. removal of the src file
+    fStatus = cluster.getNameNode().getFileInfo(name2);
+    assertNull("File "+name2+ "still exists", fStatus); // file shouldn't exist
+  
+    // 4. content
+    checkFileContent(byteFileConcat, new byte [] [] {byteFile1, byteFile2});
+  }
+  
+  /**
+   * test illegal args cases
+   */
+  @Test
+  public void testIllegalArg() throws IOException {
+    long fileLen = blockSize*3;
+    
+    Path parentDir  = new Path ("/parentTrg");
+    assertTrue(dfs.mkdirs(parentDir));
+    Path trg = new Path(parentDir, "trg");
+    DFSTestUtil.createFile(dfs, trg, fileLen, REPL_FACTOR, 1);
+
+    // must be in the same dir
+    {
+      // create first file
+      Path dir1 = new Path ("/dir1");
+      assertTrue(dfs.mkdirs(dir1));
+      Path src = new Path(dir1, "src");
+      DFSTestUtil.createFile(dfs, src, fileLen, REPL_FACTOR, 1);
+      
+      try {
+        dfs.concat(trg, new Path [] {src});
+        fail("didn't fail for src and trg in different directories");
+      } catch (Exception e) {
+        // expected
+      }
+    }
+    // non existing file
+    try {
+      dfs.concat(trg, new Path [] {new Path("test1/a")}); // non existing file
+      fail("didn't fail with invalid arguments");
+    } catch (Exception e) {
+      //expected
+    }
+    // empty arg list
+    try {
+      dfs.concat(trg, new Path [] {}); // empty array
+      fail("didn't fail with invalid arguments");
+    } catch (Exception e) {
+      // exspected
+    }
+ 
+  }
+}