Browse Source

HDFS-8122. Erasure Coding: Support specifying ECSchema during creation of ECZone. Contributed by Vinayakumar B.

Zhe Zhang 10 năm trước cách đây
mục cha
commit
a32c4dc38a
16 tập tin đã thay đổi với 108 bổ sung49 xóa
  1. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  2. 33 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  3. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  4. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  5. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  6. 23 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
  7. 10 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  8. 9 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  9. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  10. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  11. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  12. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
  13. 9 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
  14. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
  15. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
  16. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -1333,7 +1333,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
                              Progressable progress,
                              int buffersize,
                              ChecksumOpt checksumOpt) throws IOException {
-    return create(src, permission, flag, createParent, replication, blockSize, 
+    return create(src, permission, flag, createParent, replication, blockSize,
         progress, buffersize, checksumOpt, null);
   }
 
@@ -3010,12 +3010,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return new EncryptionZoneIterator(namenode, traceSampler);
   }
 
-  public void createErasureCodingZone(String src)
+  public void createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     checkOpen();
     TraceScope scope = getPathTraceScope("createErasureCodingZone", src);
     try {
-      namenode.createErasureCodingZone(src);
+      namenode.createErasureCodingZone(src, schema);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
           SafeModeException.class,

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
@@ -2273,4 +2274,36 @@ public class DistributedFileSystem extends FileSystem {
       throws IOException {
     return dfs.getInotifyEventStream(lastReadTxid);
   }
+
+  /**
+   * Create the erasurecoding zone
+   * 
+   * @param path Directory to create the ec zone
+   * @param schema ECSchema for the zone. If not specified default will be used.
+   * @throws IOException
+   */
+  public void createErasureCodingZone(final Path path, final ECSchema schema)
+      throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        dfs.createErasureCodingZone(getPathName(p), null);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p) throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+          myDfs.createErasureCodingZone(p, schema);
+          return null;
+        }
+        throw new UnsupportedOperationException(
+            "Cannot createErasureCodingZone through a symlink to a "
+                + "non-DistributedFileSystem: " + path + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
 }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -1364,11 +1364,11 @@ public interface ClientProtocol {
       long prevId) throws IOException;
 
   /**
-   * Create an erasure coding zone (currently with hardcoded schema)
-   * TODO: Configurable and pluggable schemas (HDFS-7337)
+   * Create an erasure coding zone with specified schema, if any, otherwise
+   * default
    */
   @Idempotent
-  public void createErasureCodingZone(String src)
+  public void createErasureCodingZone(String src, ECSchema schema)
       throws IOException;
 
   /**

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -1403,7 +1403,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, CreateErasureCodingZoneRequestProto req)
       throws ServiceException {
     try {
-      server.createErasureCodingZone(req.getSrc());
+      ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req
+          .getSchema()) : null;
+      server.createErasureCodingZone(req.getSrc(), schema);
       return CreateErasureCodingZoneResponseProto.newBuilder().build();
     } catch (IOException e) {
       throw new ServiceException(e);

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -1420,11 +1420,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
   }
 
   @Override
-  public void createErasureCodingZone(String src)
+  public void createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     final CreateErasureCodingZoneRequestProto.Builder builder =
         CreateErasureCodingZoneRequestProto.newBuilder();
     builder.setSrc(src);
+    if (schema != null) {
+      builder.setSchema(PBHelper.convertECSchema(schema));
+    }
     CreateErasureCodingZoneRequestProto req = builder.build();
     try {
       rpcProxy.createErasureCodingZone(null, req);

+ 23 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java

@@ -22,6 +22,9 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.hdfs.XAttrHelper;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -50,7 +53,11 @@ public class ErasureCodingZoneManager {
     this.dir = dir;
   }
 
-  boolean getECPolicy(INodesInPath iip) {
+  boolean getECPolicy(INodesInPath iip) throws IOException {
+    return getECSchema(iip) != null;
+  }
+
+  ECSchema getECSchema(INodesInPath iip) throws IOException{
     assert dir.hasReadLock();
     Preconditions.checkNotNull(iip);
     List<INode> inodes = iip.getReadOnlyINodes();
@@ -64,21 +71,23 @@ public class ErasureCodingZoneManager {
       // EC
       // TODO: properly support symlinks in EC zones
       if (inode.isSymlink()) {
-        return false;
+        return null;
       }
       final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
           new ArrayList<XAttr>(0)
           : inode.getXAttrFeature().getXAttrs();
       for (XAttr xAttr : xAttrs) {
         if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
-          return true;
+          ECSchemaProto ecSchemaProto;
+          ecSchemaProto = ECSchemaProto.parseFrom(xAttr.getValue());
+          return PBHelper.convertECSchema(ecSchemaProto);
         }
       }
     }
-    return false;
+    return null;
   }
 
-  XAttr createErasureCodingZone(String src)
+  XAttr createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     assert dir.hasWriteLock();
     final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
@@ -97,8 +106,15 @@ public class ErasureCodingZoneManager {
       throw new IOException("Directory " + src + " is already in an " +
           "erasure coding zone.");
     }
-    final XAttr ecXAttr = XAttrHelper
-        .buildXAttr(XATTR_ERASURECODING_ZONE, null);
+    // TODO HDFS-7859 Need to persist the schema in xattr in efficient way
+    // As of now storing the protobuf format
+    if (schema == null) {
+      schema = ECSchemaManager.getSystemDefaultSchema();
+    }
+    ECSchemaProto schemaProto = PBHelper.convertECSchema(schema);
+    byte[] schemaBytes = schemaProto.toByteArray();
+    final XAttr ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
+        schemaBytes);
     final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
     xattrs.add(ecXAttr);
     FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,

+ 10 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -388,7 +389,7 @@ public class FSDirectory implements Closeable {
   void disableQuotaChecks() {
     skipQuotaCheck = true;
   }
-
+  
   /**
    * This is a wrapper for resolvePath(). If the path passed
    * is prefixed with /.reserved/raw, then it checks to ensure that the caller
@@ -1225,20 +1226,24 @@ public class FSDirectory implements Closeable {
     }
   }
 
-  XAttr createErasureCodingZone(String src)
+  XAttr createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     writeLock();
     try {
-      return ecZoneManager.createErasureCodingZone(src);
+      return ecZoneManager.createErasureCodingZone(src, schema);
     } finally {
       writeUnlock();
     }
   }
 
-  public boolean getECPolicy(INodesInPath iip) {
+  public boolean getECPolicy(INodesInPath iip) throws IOException {
+    return getECSchema(iip) != null;
+  }
+
+  ECSchema getECSchema(INodesInPath iip) throws IOException {
     readLock();
     try {
-      return ecZoneManager.getECPolicy(iip);
+      return ecZoneManager.getECSchema(iip);
     } finally {
       readUnlock();
     }

+ 9 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -7518,16 +7518,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   /**
    * Create an erasure coding zone on directory src.
-   *
+   * @param schema  ECSchema for the erasure coding zone
    * @param src     the path of a directory which will be the root of the
    *                erasure coding zone. The directory must be empty.
+   *
    * @throws AccessControlException  if the caller is not the superuser.
    * @throws UnresolvedLinkException if the path can't be resolved.
    * @throws SafeModeException       if the Namenode is in safe mode.
    */
-  void createErasureCodingZone(final String srcArg,
-      final boolean logRetryCache)
-      throws IOException, UnresolvedLinkException,
+  void createErasureCodingZone(final String srcArg, final ECSchema schema,
+      final boolean logRetryCache) throws IOException, UnresolvedLinkException,
       SafeModeException, AccessControlException {
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
@@ -7543,7 +7543,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot create erasure coding zone on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
 
-      final XAttr ecXAttr = dir.createErasureCodingZone(src);
+      final XAttr ecXAttr = dir.createErasureCodingZone(src, schema);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(ecXAttr);
       getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
@@ -7573,11 +7573,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (isPermissionEnabled) {
         dir.checkPathAccess(pc, iip, FsAction.READ);
       }
-      if (dir.getECPolicy(iip)) {
-        // TODO HDFS-8074 and HDFS-7859 : To get from loaded schemas
-        Map<String, String> options = new HashMap<String, String>();
-        ECSchema defaultSchema = new ECSchema("RS-6-3", "rs", 6, 3, options);
-        return new ECInfo(src, defaultSchema);
+      // Get schema set for the zone
+      ECSchema schema = dir.getECSchema(iip);
+      if (schema != null) {
+        return new ECInfo(src, schema);
       }
     } finally {
       readUnlock();

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -1823,8 +1823,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
-  public void createErasureCodingZone(String src)
-    throws IOException {
+  public void createErasureCodingZone(String src, ECSchema schema)
+      throws IOException {
     checkNNStartup();
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1832,7 +1832,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     }
     boolean success = false;
     try {
-      namesystem.createErasureCodingZone(src, cacheEntry != null);
+      namesystem.createErasureCodingZone(src, schema, cacheEntry != null);
     } finally {
       RetryCache.setState(cacheEntry, success);
     }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -716,6 +716,7 @@ message GetEditsFromTxidResponseProto {
 
 message CreateErasureCodingZoneRequestProto {
   required string src = 1;
+  optional ECSchemaProto schema = 2;
 }
 
 message CreateErasureCodingZoneResponseProto {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -1855,7 +1855,7 @@ public class DFSTestUtil {
       int numBlocks, int numStripesPerBlk) throws Exception {
     DistributedFileSystem dfs = cluster.getFileSystem();
     dfs.mkdirs(dir);
-    dfs.getClient().createErasureCodingZone(dir.toString());
+    dfs.getClient().createErasureCodingZone(dir.toString(), null);
 
     FSDataOutputStream out = null;
     try {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java

@@ -50,7 +50,7 @@ public class TestDFSStripedOutputStream {
     Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
-    cluster.getFileSystem().getClient().createErasureCodingZone("/");
+    cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
     fs = cluster.getFileSystem();
   }
 

+ 9 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java

@@ -64,7 +64,7 @@ public class TestErasureCodingZones {
     fs.mkdir(testDir, FsPermission.getDirDefault());
 
     /* Normal creation of an erasure coding zone */
-    fs.getClient().createErasureCodingZone(testDir.toString());
+    fs.getClient().createErasureCodingZone(testDir.toString(), null);
 
     /* Verify files under the zone are striped */
     final Path ECFilePath = new Path(testDir, "foo");
@@ -77,7 +77,7 @@ public class TestErasureCodingZones {
     fs.mkdir(notEmpty, FsPermission.getDirDefault());
     fs.create(new Path(notEmpty, "foo"));
     try {
-      fs.getClient().createErasureCodingZone(notEmpty.toString());
+      fs.getClient().createErasureCodingZone(notEmpty.toString(), null);
       fail("Erasure coding zone on non-empty dir");
     } catch (IOException e) {
       assertExceptionContains("erasure coding zone for a non-empty directory", e);
@@ -87,10 +87,10 @@ public class TestErasureCodingZones {
     final Path zone1 = new Path("/zone1");
     final Path zone2 = new Path(zone1, "zone2");
     fs.mkdir(zone1, FsPermission.getDirDefault());
-    fs.getClient().createErasureCodingZone(zone1.toString());
+    fs.getClient().createErasureCodingZone(zone1.toString(), null);
     fs.mkdir(zone2, FsPermission.getDirDefault());
     try {
-      fs.getClient().createErasureCodingZone(zone2.toString());
+      fs.getClient().createErasureCodingZone(zone2.toString(), null);
       fail("Nested erasure coding zones");
     } catch (IOException e) {
       assertExceptionContains("already in an erasure coding zone", e);
@@ -100,7 +100,7 @@ public class TestErasureCodingZones {
     final Path fPath = new Path("/file");
     fs.create(fPath);
     try {
-      fs.getClient().createErasureCodingZone(fPath.toString());
+      fs.getClient().createErasureCodingZone(fPath.toString(), null);
       fail("Erasure coding zone on file");
     } catch (IOException e) {
       assertExceptionContains("erasure coding zone for a file", e);
@@ -113,8 +113,8 @@ public class TestErasureCodingZones {
     final Path dstECDir = new Path("/dstEC");
     fs.mkdir(srcECDir, FsPermission.getDirDefault());
     fs.mkdir(dstECDir, FsPermission.getDirDefault());
-    fs.getClient().createErasureCodingZone(srcECDir.toString());
-    fs.getClient().createErasureCodingZone(dstECDir.toString());
+    fs.getClient().createErasureCodingZone(srcECDir.toString(), null);
+    fs.getClient().createErasureCodingZone(dstECDir.toString(), null);
     final Path srcFile = new Path(srcECDir, "foo");
     fs.create(srcFile);
 
@@ -158,7 +158,7 @@ public class TestErasureCodingZones {
     // dir ECInfo before creating ec zone
     assertNull(fs.getClient().getErasureCodingInfo(src));
     // dir ECInfo after creating ec zone
-    fs.getClient().createErasureCodingZone(src);
+    fs.getClient().createErasureCodingZone(src, null);
     verifyErasureCodingInfo(src);
     fs.create(new Path(ecDir, "/child1")).close();
     // verify for the files in ec zone
@@ -182,4 +182,4 @@ public class TestErasureCodingZones {
     assertEquals("Default chunkSize should be used",
         ECSchema.DEFAULT_CHUNK_SIZE, schema.getChunkSize());
   }
-}
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java

@@ -68,7 +68,7 @@ public class TestAddStripedBlocks {
         .numDataNodes(GROUP_SIZE).build();
     cluster.waitActive();
     dfs = cluster.getFileSystem();
-    dfs.getClient().createErasureCodingZone("/");
+    dfs.getClient().createErasureCodingZone("/", null);
   }
 
   @After

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java

@@ -445,7 +445,7 @@ public class TestFSEditLogLoader {
 
       //set the storage policy of the directory
       fs.mkdir(new Path(testDir), new FsPermission("755"));
-      fs.getClient().getNamenode().createErasureCodingZone(testDir);
+      fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
 
       // Create a file with striped block
       Path p = new Path(testFilePath);
@@ -517,7 +517,7 @@ public class TestFSEditLogLoader {
 
       //set the storage policy of the directory
       fs.mkdir(new Path(testDir), new FsPermission("755"));
-      fs.getClient().getNamenode().createErasureCodingZone(testDir);
+      fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
 
       //create a file with striped blocks
       Path p = new Path(testFilePath);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java

@@ -135,7 +135,7 @@ public class TestFSImage {
   private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
                                                boolean isUC) throws IOException{
     // contruct a INode with StripedBlock for saving and loading
-    fsn.createErasureCodingZone("/", false);
+    fsn.createErasureCodingZone("/", null, false);
     long id = 123456789;
     byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
     PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
@@ -397,7 +397,7 @@ public class TestFSImage {
           .build();
       cluster.waitActive();
       DistributedFileSystem fs = cluster.getFileSystem();
-      fs.getClient().getNamenode().createErasureCodingZone("/");
+      fs.getClient().getNamenode().createErasureCodingZone("/", null);
       Path file = new Path("/striped");
       FSDataOutputStream out = fs.create(file);
       byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);