소스 검색

HDFS-570. Get last block length from a data-node when opening a file being written to. Contributed by Tsz Wo (Nicholas), SZE.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@819232 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 15 년 전
부모
커밋
cc07fe635f

+ 3 - 0
CHANGES.txt

@@ -43,6 +43,9 @@ Append branch (unreleased changes)
    HDFS-627. Support replica update in data-node.
    (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
 
+   HDFS-570. Get last block length from a data-node when opening a file
+   being written to. (Tsz Wo (Nicholas), SZE via shv)
+
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

+ 83 - 11
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -1611,6 +1611,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private BlockReader blockReader = null;
     private boolean verifyChecksum;
     private LocatedBlocks locatedBlocks = null;
+    private long lastBlockBeingWrittenLength = 0;
     private DatanodeInfo currentNode = null;
     private Block currentBlock = null;
     private long pos = 0;
@@ -1643,6 +1644,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
      */
     synchronized void openInfo() throws IOException {
       LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("newInfo = " + newInfo);
+      }
       if (newInfo == null) {
         throw new IOException("Cannot open filename " + src);
       }
@@ -1657,11 +1661,39 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         }
       }
       this.locatedBlocks = newInfo;
+      this.lastBlockBeingWrittenLength = 
+          locatedBlocks.isLastBlockComplete()? 0:
+              readBlockLength(locatedBlocks.getLastLocatedBlock()); 
       this.currentNode = null;
     }
+
+    /** Read the block length from one of the datanodes. */
+    private long readBlockLength(LocatedBlock locatedblock) throws IOException {
+      if (locatedblock == null || locatedblock.getLocations().length == 0) {
+        return 0;
+      }
+      for(DatanodeInfo datanode : locatedblock.getLocations()) {
+        try {
+          final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy(
+              datanode, conf);
+          final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+          if (n >= 0) {
+            return n;
+          }
+        }
+        catch(IOException ioe) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Faild to getReplicaVisibleLength from datanode "
+                + datanode + " for block " + locatedblock.getBlock(), ioe);
+          }
+        }
+      }
+      throw new IOException("Cannot obtain block length for " + locatedblock);
+    }
     
     public synchronized long getFileLength() {
-      return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
+      return locatedBlocks == null? 0:
+          locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
     }
 
     /**
@@ -1697,17 +1729,36 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private synchronized LocatedBlock getBlockAt(long offset,
         boolean updatePosition) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
-      // search cached blocks first
-      int targetBlockIdx = locatedBlocks.findBlock(offset);
-      if (targetBlockIdx < 0) { // block is not cached
-        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-        // fetch more blocks
-        LocatedBlocks newBlocks;
-        newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
-        assert (newBlocks != null) : "Could not find target position " + offset;
-        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+
+      final LocatedBlock blk;
+
+      //check offset
+      if (offset < 0 || offset >= getFileLength()) {
+        throw new IOException("offset < 0 || offset > getFileLength(), offset="
+            + offset
+            + ", updatePosition=" + updatePosition
+            + ", locatedBlocks=" + locatedBlocks);
+      }
+      else if (offset >= locatedBlocks.getFileLength()) {
+        // offset to the portion of the last block,
+        // which is not known to the name-node yet;
+        // getting the last block 
+        blk = locatedBlocks.getLastLocatedBlock();
+      }
+      else {
+        // search cached blocks first
+        int targetBlockIdx = locatedBlocks.findBlock(offset);
+        if (targetBlockIdx < 0) { // block is not cached
+          targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+          // fetch more blocks
+          LocatedBlocks newBlocks;
+          newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+          assert (newBlocks != null) : "Could not find target position " + offset;
+          locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+        }
+        blk = locatedBlocks.get(targetBlockIdx);
       }
-      LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
+
       // update current position
       if (updatePosition) {
         this.pos = offset;
@@ -1744,6 +1795,27 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private synchronized List<LocatedBlock> getBlockRange(long offset, 
                                                           long length) 
                                                         throws IOException {
+      final List<LocatedBlock> blocks;
+      if (locatedBlocks.isLastBlockComplete()) {
+        blocks = getFinalizedBlockRange(offset, length);
+      }
+      else {
+        if (length + offset > locatedBlocks.getFileLength()) {
+          length = locatedBlocks.getFileLength() - offset;
+        }
+        blocks = getFinalizedBlockRange(offset, length);
+        blocks.add(locatedBlocks.getLastLocatedBlock());
+      }
+      return blocks;
+    }
+
+    /**
+     * Get blocks in the specified range.
+     * Includes only the complete blocks.
+     * Fetch them from the namenode if not cached.
+     */
+    private synchronized List<LocatedBlock> getFinalizedBlockRange(
+        long offset, long length) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
       // search cached blocks first

+ 5 - 2
src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -29,9 +29,9 @@ public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 4: never return null and always return a newly generated access token
+   * 5: add getReplicaVisibleLength(..)
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 5L;
 
   /** Start generation-stamp recovery for specified block
    * @param block the specified block
@@ -45,4 +45,7 @@ public interface ClientDatanodeProtocol extends VersionedProtocol {
   @Deprecated // not used anymore - should be removed
   LocatedBlock recoverBlock(Block block, boolean keepLength,
       DatanodeInfo[] targets) throws IOException;
+
+  /** Return the visible length of a replica. */
+  long getReplicaVisibleLength(Block b) throws IOException;
 }

+ 2 - 4
src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -44,11 +44,9 @@ public interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 49: added two new methods to support pipeline recovery and append
-   *     updateBlockForPipeline(Block, String) and
-   *     updatePipeline(String, Block, Block, DatanodeID[])
+   * 50: change LocatedBlocks to include last block information.
    */
-  public static final long versionID = 49L;
+  public static final long versionID = 50L;
   
   ///////////////////////////////////////
   // File contents

+ 16 - 0
src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java

@@ -145,4 +145,20 @@ public class LocatedBlock implements Writable {
       locs[i].readFields(in);
     }
   }
+
+  /** Read LocatedBlock from in. */
+  public static LocatedBlock read(DataInput in) throws IOException {
+    final LocatedBlock lb = new LocatedBlock();
+    lb.readFields(in);
+    return lb;
+  }
+
+  /** {@inheritDoc} */
+  public String toString() {
+    return getClass().getSimpleName() + "{" + b
+        + "; corrupt=" + corrupt
+        + "; offset=" + offset
+        + "; locs=" + java.util.Arrays.asList(locs)
+        + "}";
+  }
 }

+ 49 - 2
src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java

@@ -36,6 +36,8 @@ public class LocatedBlocks implements Writable {
   private long fileLength;
   private List<LocatedBlock> blocks; // array of blocks with prioritized locations
   private boolean underConstruction;
+  private LocatedBlock lastLocatedBlock = null;
+  private boolean isLastBlockComplete = false;
 
   LocatedBlocks() {
     fileLength = 0;
@@ -43,11 +45,15 @@ public class LocatedBlocks implements Writable {
     underConstruction = false;
   }
   
-  public LocatedBlocks(long flength, List<LocatedBlock> blks, boolean isUnderConstuction) {
-
+  /** public Constructor */
+  public LocatedBlocks(long flength, boolean isUnderConstuction,
+      List<LocatedBlock> blks, 
+      LocatedBlock lastBlock, boolean isLastBlockCompleted) {
     fileLength = flength;
     blocks = blks;
     underConstruction = isUnderConstuction;
+    this.lastLocatedBlock = lastBlock;
+    this.isLastBlockComplete = isLastBlockCompleted;
   }
   
   /**
@@ -57,6 +63,16 @@ public class LocatedBlocks implements Writable {
     return blocks;
   }
   
+  /** Get the last located block. */
+  public LocatedBlock getLastLocatedBlock() {
+    return lastLocatedBlock;
+  }
+  
+  /** Is the last block completed? */
+  public boolean isLastBlockComplete() {
+    return isLastBlockComplete;
+  }
+
   /**
    * Get located block.
    */
@@ -161,6 +177,15 @@ public class LocatedBlocks implements Writable {
   public void write(DataOutput out) throws IOException {
     out.writeLong(this.fileLength);
     out.writeBoolean(underConstruction);
+
+    //write the last located block
+    final boolean isNull = lastLocatedBlock == null;
+    out.writeBoolean(isNull);
+    if (!isNull) {
+      lastLocatedBlock.write(out);
+    }
+    out.writeBoolean(isLastBlockComplete);
+
     // write located blocks
     int nrBlocks = locatedBlockCount();
     out.writeInt(nrBlocks);
@@ -175,6 +200,14 @@ public class LocatedBlocks implements Writable {
   public void readFields(DataInput in) throws IOException {
     this.fileLength = in.readLong();
     underConstruction = in.readBoolean();
+
+    //read the last located block
+    final boolean isNull = in.readBoolean();
+    if (!isNull) {
+      lastLocatedBlock = LocatedBlock.read(in);
+    }
+    isLastBlockComplete = in.readBoolean();
+
     // read located blocks
     int nrBlocks = in.readInt();
     this.blocks = new ArrayList<LocatedBlock>(nrBlocks);
@@ -184,4 +217,18 @@ public class LocatedBlocks implements Writable {
       this.blocks.add(blk);
     }
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+    b.append("{")
+     .append("\n  fileLength=").append(fileLength)
+     .append("\n  underConstruction=").append(underConstruction)
+     .append("\n  blocks=").append(blocks)
+     .append("\n  lastLocatedBlock=").append(lastLocatedBlock)
+     .append("\n  isLastBlockComplete=").append(isLastBlockComplete)
+     .append("}");
+    return b.toString();
+  }
 }

+ 15 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1725,4 +1725,19 @@ public class DataNode extends Configured
     LOG.info(who + " calls recoverBlock(block=" + block
         + ", targets=[" + msg + "])");
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getReplicaVisibleLength(final Block block) throws IOException {
+    final Replica replica = data.getReplica(block.getBlockId());
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+      throw new IOException(
+          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+          + block + ", replica=" + replica);
+    }
+    return replica.getVisibleLength();
+  }
 }

+ 13 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java

@@ -266,4 +266,17 @@ class BlockInfoUnderConstruction extends BlockInfo {
     // Sufficient to rely on super's implementation
     return (this == obj) || super.equals(obj);
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+    b.append("{")
+     .append("\n  blockUCState=").append(blockUCState)
+     .append("\n  replicas=").append(replicas)
+     .append("\n  primaryNodeIndex=").append(primaryNodeIndex)
+     .append("\n  lastRecoveryTime=").append(lastRecoveryTime)
+     .append("}");
+    return b.toString();
+  }
 }

+ 31 - 35
src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java

@@ -22,7 +22,6 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.security.AccessTokenHandler;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -346,43 +344,12 @@ public class BlockManager {
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return null;
+      return Collections.<LocatedBlock>emptyList();
 
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      // get block locations
-      int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
-      int numCorruptReplicas = corruptReplicas
-          .numCorruptReplicas(blocks[curBlk]);
-      if (numCorruptNodes != numCorruptReplicas) {
-        FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
-            + blocks[curBlk] + "blockMap has " + numCorruptNodes
-            + " but corrupt replicas map has " + numCorruptReplicas);
-      }
-      boolean blockCorrupt = (numCorruptNodes == numNodes);
-      int numMachineSet = blockCorrupt ? numNodes :
-                          (numNodes - numCorruptNodes);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
-      if (numMachineSet > 0) {
-        numNodes = 0;
-        for (Iterator<DatanodeDescriptor> it = 
-             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          DatanodeDescriptor dn = it.next();
-          boolean replicaCorrupt = 
-            corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
-          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
-            machineSet[numNodes++] = dn;
-        }
-      }
-      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
-          blockCorrupt);
-      if (namesystem.isAccessTokenEnabled) {
-        b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
-      }
-      results.add(b);
+      results.add(getBlockLocation(blocks[curBlk], curPos));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -391,6 +358,35 @@ public class BlockManager {
     return results;
   }
 
+  /** @return a LocatedBlock for the given block */
+  LocatedBlock getBlockLocation(final Block blk, final long pos
+      ) throws IOException {
+    // get block locations
+    final int numCorruptNodes = countNodes(blk).corruptReplicas();
+    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
+    if (numCorruptNodes != numCorruptReplicas) {
+      FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
+          + blk + "blockMap has " + numCorruptNodes
+          + " but corrupt replicas map has " + numCorruptReplicas);
+    }
+
+    final int numNodes = blocksMap.numNodes(blk);
+    final boolean isCorrupt = numCorruptNodes == numNodes;
+    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
+    final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+    if (numMachines > 0) {
+      int j = 0;
+      for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
+          it.hasNext();) {
+        final DatanodeDescriptor d = it.next();
+        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
+        if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+          machines[j++] = d;
+      }
+    }
+    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);    
+  }
+
   /**
    * Check whether the replication parameter is within the range
    * determined by system configuration.

+ 42 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -713,14 +713,51 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     if (doAccessTime && isAccessTimeSupported()) {
       dir.setTimes(src, inode, -1, now(), false);
     }
-    final Block[] blocks = inode.getBlocks();
+    final BlockInfo[] blocks = inode.getBlocks();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+    }
     if (blocks == null) {
       return null;
     }
-    final List<LocatedBlock> results = blocks.length == 0?
-        new ArrayList<LocatedBlock>(0):
-        blockManager.getBlockLocations(blocks, offset, length, Integer.MAX_VALUE);
-    return inode.createLocatedBlocks(results);
+
+    if (blocks.length == 0) {
+      return new LocatedBlocks(0, inode.isUnderConstruction(),
+          Collections.<LocatedBlock>emptyList(), null, false);
+    } else {
+      final long n = inode.computeFileSize(false);
+      final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
+          blocks, offset, length, Integer.MAX_VALUE);
+      final BlockInfo last = inode.getLastBlock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("last = " + last);
+      }
+
+      if (!last.isComplete()) {
+        final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)last;
+        final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("locations = " + java.util.Arrays.asList(locations));
+        }
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            createLocatedBlock(uc, locations, n, false), false);
+      }
+      else {
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+      }
+    }
+  }
+
+  /** Create a LocatedBlock. */
+  LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+      final long offset, final boolean corrupt) throws IOException {
+    final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
+    if (isAccessTokenEnabled) {
+      lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
+          EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+    }
+    return lb;
   }
 
   /**

+ 0 - 8
src/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -25,8 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
  * We keep an in-memory representation of the file/block hierarchy.
@@ -423,10 +421,4 @@ abstract class INode implements Comparable<byte[]>, FSInodeInfo {
     }
     return null;
   }
-  
-  
-  LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
-    return new LocatedBlocks(computeContentSummary().getLength(), blocks,
-        isUnderConstruction());
-  }
 }

+ 18 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -122,16 +122,29 @@ class INodeFile extends INode {
 
   /** {@inheritDoc} */
   long[] computeContentSummary(long[] summary) {
-    long bytes = 0;
-    for(Block blk : blocks) {
-      bytes += blk.getNumBytes();
-    }
-    summary[0] += bytes;
+    summary[0] += computeFileSize(true);
     summary[1]++;
     summary[3] += diskspaceConsumed();
     return summary;
   }
 
+  /** Compute file size.
+   * May or may not include BlockInfoUnderConstruction.
+   */
+  long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+    if (blocks == null || blocks.length == 0) {
+      return 0;
+    }
+    final int last = blocks.length - 1;
+    //check if the last block is BlockInfoUnderConstruction
+    long bytes = blocks[last] instanceof BlockInfoUnderConstruction
+                 && !includesBlockInfoUnderConstruction?
+                     0: blocks[last].getNumBytes();
+    for(int i = 0; i < last; i++) {
+      bytes += blocks[i].getNumBytes();
+    }
+    return bytes;
+  }
   
 
   @Override

+ 93 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test reading from hdfs while a file is being written. */
+public class TestReadWhileWriting {
+  {
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private static final String DIR = "/"
+      + TestReadWhileWriting.class.getSimpleName() + "/";
+  private static final int BLOCK_SIZE = 8192;
+
+  /** Test reading while writing. */
+  @Test
+  public void testReadWhileWriting() throws Exception {
+    Configuration conf = new Configuration();
+    // create cluster
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    try {
+      cluster.waitActive();
+      final FileSystem fs = cluster.getFileSystem();
+
+      // write to a file but not closing it.
+      final Path p = new Path(DIR, "file1");
+      final FSDataOutputStream out = fs.create(p, true,
+          fs.getConf().getInt("io.file.buffer.size", 4096),
+          (short)3, BLOCK_SIZE);
+      final int size = BLOCK_SIZE/3;
+      final byte[] buffer = AppendTestUtil.randomBytes(0, size);
+      out.write(buffer, 0, size);
+      out.flush();
+      out.sync();
+
+      // able to read?
+      Assert.assertTrue(read(fs, p, size));
+
+      out.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /** able to read? */
+  private static boolean read(FileSystem fs, Path p, int expectedsize
+      ) throws Exception {
+    //try at most 3 minutes
+    for(int i = 0; i < 360; i++) {
+      final FSDataInputStream in = fs.open(p);
+      try {
+        final int available = in.available();
+        System.out.println(i + ") in.available()=" + available);
+        Assert.assertTrue(available >= 0);
+        Assert.assertTrue(available <= expectedsize);
+        if (available == expectedsize) {
+          return true;
+        }
+      } finally {
+        in.close();
+      }
+      Thread.sleep(500);
+    }
+    return false;
+  }
+}