소스 검색

HDFS-11693. Ozone:Add archive support to containers. Contributed by Anu Engineer.

Xiaoyu Yao 8 년 전
부모
커밋
6e8584fc13

+ 90 - 19
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java

@@ -18,6 +18,25 @@
 
 package org.apache.hadoop.fs;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -33,33 +52,21 @@ import java.net.UnknownHostException;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.jar.Attributes;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
-
-import org.apache.commons.collections.map.CaseInsensitiveMap;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.util.StringUtils;
+import java.util.zip.ZipOutputStream;
 
 /**
- * A collection of file-processing util methods
+ * A collection of file-processing util methods.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -74,6 +81,11 @@ public class FileUtil {
    * */
   public static final int SYMLINK_NO_PRIVILEGE = 2;
 
+  /**
+   * Buffer size used while zipping and unzipping zip-ed archives.
+   */
+  private static final int BUFFER_SIZE = 8192;
+
   /**
    * convert an array of FileStatus to an array of Path
    *
@@ -573,6 +585,65 @@ public class FileUtil {
     }
   }
 
+  /**
+   * creates zip archieve of the source dir and writes a zip file.
+   *
+   * @param sourceDir - The directory to zip.
+   * @param archiveName - The destination file, the parent directory is assumed
+   * to exist.
+   * @return Checksum of the Archive.
+   * @throws IOException - Throws if zipFileName already exists or if the
+   *                     sourceDir does not exist.
+   */
+  public static Long zip(File sourceDir, File archiveName) throws IOException {
+    Preconditions.checkNotNull(sourceDir, "source directory cannot be null");
+    Preconditions.checkState(sourceDir.exists(), "source directory must " +
+        "exist");
+
+    Preconditions.checkNotNull(archiveName, "Destination file cannot be null");
+    Preconditions.checkNotNull(archiveName.getParent(), "Destination " +
+        "directory cannot be null");
+    Preconditions.checkState(new File(archiveName.getParent()).exists(),
+        "Destination directory must exist");
+    Preconditions.checkState(!archiveName.exists(), "Destination file " +
+        "already exists. Refusing to overwrite existing file.");
+
+    CheckedOutputStream checksum;
+    try (FileOutputStream outputStream =
+             new FileOutputStream(archiveName)) {
+      checksum = new CheckedOutputStream(outputStream, new CRC32());
+      byte[] data = new byte[BUFFER_SIZE];
+      try (ZipOutputStream out =
+               new ZipOutputStream(new BufferedOutputStream(checksum))) {
+
+        Iterator<File> fileIter = FileUtils.iterateFiles(sourceDir, null, true);
+        while (fileIter.hasNext()) {
+          File file = fileIter.next();
+          LOG.debug("Compressing file : " + file.getPath());
+          try (FileInputStream currentFile = new FileInputStream(file)) {
+            ZipEntry entry = new ZipEntry(file.getCanonicalPath());
+            out.putNextEntry(entry);
+            try (BufferedInputStream sourceFile
+                     = new BufferedInputStream(currentFile, BUFFER_SIZE)) {
+              int bytesRead;
+              while ((bytesRead = sourceFile.read(data, 0, BUFFER_SIZE)) !=
+                  -1) {
+                out.write(data, 0, bytesRead);
+              }
+            }
+          }
+        }
+        out.flush();
+      }
+    }
+    // Exit condition -- ZipFile must exist.
+    Verify.verify(archiveName.exists(), "Expected archive file missing: {}",
+        archiveName.toPath());
+    long crc32 = checksum.getChecksum().getValue();
+    checksum.close();
+    return crc32;
+  }
+
   /**
    * Given a File input it will unzip the file in a the unzip directory
    * passed as the second parameter
@@ -595,12 +666,12 @@ public class FileUtil {
             if (!file.getParentFile().mkdirs()) {
               if (!file.getParentFile().isDirectory()) {
                 throw new IOException("Mkdirs failed to create " +
-                                      file.getParentFile().toString());
+                    file.getParentFile().toString());
               }
             }
             OutputStream out = new FileOutputStream(file);
             try {
-              byte[] buffer = new byte[8192];
+              byte[] buffer = new byte[BUFFER_SIZE];
               int i;
               while ((i = in.read(buffer)) != -1) {
                 out.write(buffer, 0, i);

+ 35 - 16
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto

@@ -74,25 +74,27 @@ import "Ozone.proto";
  *  16. GetSmallFile - A single RPC that combines both getKey and ReadChunk.
  *
  *  17. CloseContainer - Closes an open container and makes it immutable.
+ *
+ *  18. CopyContainer - Copies a container from a remote machine.
  */
 
 enum Type {
-   CreateContainer = 1;
-   ReadContainer = 2;
-   UpdateContainer = 3;
-   DeleteContainer = 4;
-   ListContainer = 5;
-
-   PutKey = 6;
-   GetKey = 7;
-   DeleteKey = 8;
-   ListKey = 9;
-
-   ReadChunk = 10;
-   DeleteChunk = 11;
-   WriteChunk = 12;
-   ListChunk = 13;
-   CompactChunk = 14;
+  CreateContainer = 1;
+  ReadContainer = 2;
+  UpdateContainer = 3;
+  DeleteContainer = 4;
+  ListContainer = 5;
+
+  PutKey = 6;
+  GetKey = 7;
+  DeleteKey = 8;
+  ListKey = 9;
+
+  ReadChunk = 10;
+  DeleteChunk = 11;
+  WriteChunk = 12;
+  ListChunk = 13;
+  CompactChunk = 14;
 
   /** Combines Key and Chunk Operation into Single RPC. */
   PutSmallFile = 15;
@@ -128,6 +130,8 @@ enum Result {
   ERROR_CONTAINER_NOT_EMPTY = 23;
   ERROR_IN_COMPACT_DB = 24;
   UNCLOSED_CONTAINER_IO = 25;
+  DELETE_ON_OPEN_CONTAINER = 26;
+  CLOSED_CONTAINER_RETRY = 27;
 }
 
 message ContainerCommandRequestProto {
@@ -382,3 +386,18 @@ message GetSmallFileRequestProto {
 message GetSmallFileResponseProto {
   required ReadChunkResponseProto data = 1;
 }
+
+message CopyContainerRequestProto {
+  required string containerName = 1;
+  required uint64 readOffset = 2;
+  optional uint64 len = 3;
+}
+
+message CopyContainerResponseProto {
+  required string archiveName = 1;
+  required uint64 readOffset = 2;
+  required uint64 len = 3;
+  required bool eof = 4;
+  repeated bytes data = 5;
+  optional int64 checksum = 6;
+}

+ 106 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/src/test/java/org/apache/hadoop/scm/TestArchive.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+/**
+ * Test archive creation and unpacking.
+ */
+public class TestArchive {
+  private static final int DIR_COUNT = 10;
+  private static final int SUB_DIR_COUNT = 3;
+  private static final int FILE_COUNT = 10;
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Rule
+  public TemporaryFolder outputFolder = new TemporaryFolder();
+
+  Checksum crc = new Adler32();
+
+  @Before
+  public void setUp() throws Exception {
+    Random r = new Random();
+    final int megaByte = 1024 * 1024;
+
+    for (int x = 0; x < DIR_COUNT; x++) {
+      File subdir = folder.newFolder(String.format("dir%d", x));
+      for (int y = 0; y < SUB_DIR_COUNT; y++) {
+        File targetDir = new File(subdir.getPath().concat(File.separator)
+            .concat(String.format("subdir%d%d", x, y)));
+        if(!targetDir.mkdirs()) {
+          throw new IOException("Failed to create subdirectory. " +
+              targetDir.toString());
+        }
+        for (int z = 0; z < FILE_COUNT; z++) {
+          Path temp = Paths.get(targetDir.getPath().concat(File.separator)
+              .concat(String.format("File%d.txt", z)));
+          byte[] buf = RandomStringUtils.randomAlphanumeric(r.nextInt(megaByte))
+              .getBytes("UTF-8");
+          Files.write(temp, buf);
+          crc.update(buf, 0, buf.length);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testArchive() throws Exception {
+    Checksum readCrc = new Adler32();
+    File archiveFile = new File(outputFolder.getRoot() + File.separator
+        + "test.container.zip");
+    long zipCheckSum = FileUtil.zip(folder.getRoot(), archiveFile);
+    Assert.assertTrue(zipCheckSum > 0);
+    File decomp = new File(outputFolder.getRoot() + File.separator +
+        "decompress");
+    if (!decomp.exists() && !decomp.mkdirs()) {
+      throw new IOException("Unable to create the destination directory. " +
+          decomp.getPath());
+    }
+
+    FileUtil.unZip(archiveFile, decomp);
+    String[] patterns = {"txt"};
+    Iterator<File> iter = FileUtils.iterateFiles(decomp, patterns, true);
+    int count = 0;
+    while (iter.hasNext()) {
+      count++;
+      byte[] buf = Files.readAllBytes(iter.next().toPath());
+      readCrc.update(buf, 0, buf.length);
+    }
+    Assert.assertEquals(DIR_COUNT * SUB_DIR_COUNT * FILE_COUNT, count);
+    Assert.assertEquals(crc.getValue(), readCrc.getValue());
+  }
+}

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/src/test/java/org/apache/hadoop/scm/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm;
+/**
+ Test cases for SCM client classes.
+ */