|
@@ -0,0 +1,249 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.ozone.container.keyvalue;
|
|
|
+
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Path;
|
|
|
+import java.nio.file.Paths;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+import org.apache.hadoop.ozone.OzoneConsts;
|
|
|
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
|
|
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
|
|
|
+
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
+import org.apache.commons.compress.archivers.ArchiveEntry;
|
|
|
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
|
|
|
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
|
|
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
|
|
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
|
|
+import org.apache.commons.compress.compressors.CompressorException;
|
|
|
+import org.apache.commons.compress.compressors.CompressorInputStream;
|
|
|
+import org.apache.commons.compress.compressors.CompressorOutputStream;
|
|
|
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Compress/uncompress KeyValueContainer data to a tar.gz archive.
|
|
|
+ */
|
|
|
+public class TarContainerPacker
|
|
|
+ implements ContainerPacker<KeyValueContainerData> {
|
|
|
+
|
|
|
+ private static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
|
|
|
+
|
|
|
+ private static final String DB_DIR_NAME = "db";
|
|
|
+
|
|
|
+ private static final String CONTAINER_FILE_NAME = "container.yaml";
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Given an input stream (tar file) extract the data to the specified
|
|
|
+ * directories.
|
|
|
+ *
|
|
|
+ * @param container container which defines the destination structure.
|
|
|
+ * @param inputStream the input stream.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public byte[] unpackContainerData(Container<KeyValueContainerData> container,
|
|
|
+ InputStream inputStream)
|
|
|
+ throws IOException {
|
|
|
+ byte[] descriptorFileContent = null;
|
|
|
+ try {
|
|
|
+ KeyValueContainerData containerData = container.getContainerData();
|
|
|
+ CompressorInputStream compressorInputStream =
|
|
|
+ new CompressorStreamFactory()
|
|
|
+ .createCompressorInputStream(CompressorStreamFactory.GZIP,
|
|
|
+ inputStream);
|
|
|
+
|
|
|
+ TarArchiveInputStream tarInput =
|
|
|
+ new TarArchiveInputStream(compressorInputStream);
|
|
|
+
|
|
|
+ TarArchiveEntry entry = tarInput.getNextTarEntry();
|
|
|
+ while (entry != null) {
|
|
|
+ String name = entry.getName();
|
|
|
+ if (name.startsWith(DB_DIR_NAME + "/")) {
|
|
|
+ Path destinationPath = containerData.getDbFile().toPath()
|
|
|
+ .resolve(name.substring(DB_DIR_NAME.length() + 1));
|
|
|
+ extractEntry(tarInput, entry.getSize(), destinationPath);
|
|
|
+ } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
|
|
|
+ Path destinationPath = Paths.get(containerData.getChunksPath())
|
|
|
+ .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
|
|
|
+ extractEntry(tarInput, entry.getSize(), destinationPath);
|
|
|
+ } else if (name.equals(CONTAINER_FILE_NAME)) {
|
|
|
+ //Don't do anything. Container file should be unpacked in a
|
|
|
+ //separated step by unpackContainerDescriptor call.
|
|
|
+ descriptorFileContent = readEntry(tarInput, entry);
|
|
|
+ } else {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Unknown entry in the tar file: " + "" + name);
|
|
|
+ }
|
|
|
+ entry = tarInput.getNextTarEntry();
|
|
|
+ }
|
|
|
+ return descriptorFileContent;
|
|
|
+
|
|
|
+ } catch (CompressorException e) {
|
|
|
+ throw new IOException(
|
|
|
+ "Can't uncompress the given container: " + container
|
|
|
+ .getContainerData().getContainerID(),
|
|
|
+ e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void extractEntry(TarArchiveInputStream tarInput, long size,
|
|
|
+ Path path) throws IOException {
|
|
|
+ Preconditions.checkNotNull(path, "Path element should not be null");
|
|
|
+ Path parent = Preconditions.checkNotNull(path.getParent(),
|
|
|
+ "Path element should have a parent directory");
|
|
|
+ Files.createDirectories(parent);
|
|
|
+ try (BufferedOutputStream bos = new BufferedOutputStream(
|
|
|
+ new FileOutputStream(path.toAbsolutePath().toString()))) {
|
|
|
+ int bufferSize = 1024;
|
|
|
+ byte[] buffer = new byte[bufferSize + 1];
|
|
|
+ long remaining = size;
|
|
|
+ while (remaining > 0) {
|
|
|
+ int read =
|
|
|
+ tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
|
|
|
+ if (read >= 0) {
|
|
|
+ remaining -= read;
|
|
|
+ bos.write(buffer, 0, read);
|
|
|
+ } else {
|
|
|
+ remaining = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Given a containerData include all the required container data/metadata
|
|
|
+ * in a tar file.
|
|
|
+ *
|
|
|
+ * @param container Container to archive (data + metadata).
|
|
|
+ * @param destination Destination tar file/stream.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void pack(Container<KeyValueContainerData> container,
|
|
|
+ OutputStream destination)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ KeyValueContainerData containerData = container.getContainerData();
|
|
|
+
|
|
|
+ try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
|
|
|
+ .createCompressorOutputStream(CompressorStreamFactory.GZIP,
|
|
|
+ destination)) {
|
|
|
+
|
|
|
+ try (ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(
|
|
|
+ gzippedOut)) {
|
|
|
+
|
|
|
+ includePath(containerData.getDbFile().toString(), DB_DIR_NAME,
|
|
|
+ archiveOutputStream);
|
|
|
+
|
|
|
+ includePath(containerData.getChunksPath(), CHUNKS_DIR_NAME,
|
|
|
+ archiveOutputStream);
|
|
|
+
|
|
|
+ includeFile(container.getContainerFile(),
|
|
|
+ CONTAINER_FILE_NAME,
|
|
|
+ archiveOutputStream);
|
|
|
+ }
|
|
|
+ } catch (CompressorException e) {
|
|
|
+ throw new IOException(
|
|
|
+ "Can't compress the container: " + containerData.getContainerID(),
|
|
|
+ e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public byte[] unpackContainerDescriptor(InputStream inputStream)
|
|
|
+ throws IOException {
|
|
|
+ try {
|
|
|
+ CompressorInputStream compressorInputStream =
|
|
|
+ new CompressorStreamFactory()
|
|
|
+ .createCompressorInputStream(CompressorStreamFactory.GZIP,
|
|
|
+ inputStream);
|
|
|
+
|
|
|
+ TarArchiveInputStream tarInput =
|
|
|
+ new TarArchiveInputStream(compressorInputStream);
|
|
|
+
|
|
|
+ TarArchiveEntry entry = tarInput.getNextTarEntry();
|
|
|
+ while (entry != null) {
|
|
|
+ String name = entry.getName();
|
|
|
+ if (name.equals(CONTAINER_FILE_NAME)) {
|
|
|
+ return readEntry(tarInput, entry);
|
|
|
+ }
|
|
|
+ entry = tarInput.getNextTarEntry();
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (CompressorException e) {
|
|
|
+ throw new IOException(
|
|
|
+ "Can't read the container descriptor from the container archive",
|
|
|
+ e);
|
|
|
+ }
|
|
|
+ throw new IOException(
|
|
|
+ "Container descriptor is missing from the container archive.");
|
|
|
+ }
|
|
|
+
|
|
|
+ private byte[] readEntry(TarArchiveInputStream tarInput,
|
|
|
+ TarArchiveEntry entry) throws IOException {
|
|
|
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
|
|
+ int bufferSize = 1024;
|
|
|
+ byte[] buffer = new byte[bufferSize + 1];
|
|
|
+ long remaining = entry.getSize();
|
|
|
+ while (remaining > 0) {
|
|
|
+ int read =
|
|
|
+ tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
|
|
|
+ remaining -= read;
|
|
|
+ bos.write(buffer, 0, read);
|
|
|
+ }
|
|
|
+ return bos.toByteArray();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void includePath(String containerPath, String subdir,
|
|
|
+ ArchiveOutputStream archiveOutputStream) throws IOException {
|
|
|
+
|
|
|
+ for (Path path : Files.list(Paths.get(containerPath))
|
|
|
+ .collect(Collectors.toList())) {
|
|
|
+
|
|
|
+ includeFile(path.toFile(), subdir + "/" + path.getFileName(),
|
|
|
+ archiveOutputStream);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void includeFile(File file, String entryName,
|
|
|
+ ArchiveOutputStream archiveOutputStream) throws IOException {
|
|
|
+ ArchiveEntry archiveEntry =
|
|
|
+ archiveOutputStream.createArchiveEntry(file, entryName);
|
|
|
+ archiveOutputStream.putArchiveEntry(archiveEntry);
|
|
|
+ try (FileInputStream fis = new FileInputStream(file)) {
|
|
|
+ IOUtils.copy(fis, archiveOutputStream);
|
|
|
+ }
|
|
|
+ archiveOutputStream.closeArchiveEntry();
|
|
|
+ }
|
|
|
+
|
|
|
+}
|