|
@@ -0,0 +1,600 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.server.namenode;
|
|
|
+
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
+import java.io.Closeable;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.FilterOutputStream;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.security.DigestOutputStream;
|
|
|
+import java.security.MessageDigest;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+
|
|
|
+import com.google.common.base.Charsets;
|
|
|
+import com.google.protobuf.CodedOutputStream;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configurable;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
|
|
+import org.apache.hadoop.hdfs.server.common.FileRegion;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection.DirEntry;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.MD5Hash;
|
|
|
+import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
+import org.apache.hadoop.io.compress.CompressorStream;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+
|
|
|
+import static org.apache.hadoop.hdfs.server.namenode.FSImageUtil.MAGIC_HEADER;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Utility crawling an existing hierarchical FileSystem and emitting
|
|
|
+ * a valid FSImage/NN storage.
|
|
|
+ */
|
|
|
+// TODO: generalize to types beyond FileRegion
|
|
|
+public class ImageWriter implements Closeable {
|
|
|
+
|
|
|
+ private static final int ONDISK_VERSION = 1;
|
|
|
+ private static final int LAYOUT_VERSION = -64; // see NameNodeLayoutVersion
|
|
|
+
|
|
|
+ private final Path outdir;
|
|
|
+ private final FileSystem outfs;
|
|
|
+ private final File dirsTmp;
|
|
|
+ private final OutputStream dirs;
|
|
|
+ private final File inodesTmp;
|
|
|
+ private final OutputStream inodes;
|
|
|
+ private final MessageDigest digest;
|
|
|
+ private final FSImageCompression compress;
|
|
|
+ private final long startBlock;
|
|
|
+ private final long startInode;
|
|
|
+ private final UGIResolver ugis;
|
|
|
+ private final BlockFormat.Writer<FileRegion> blocks;
|
|
|
+ private final BlockResolver blockIds;
|
|
|
+ private final Map<Long, DirEntry.Builder> dircache;
|
|
|
+ private final TrackedOutputStream<DigestOutputStream> raw;
|
|
|
+
|
|
|
+ private boolean closed = false;
|
|
|
+ private long curSec;
|
|
|
+ private long curBlock;
|
|
|
+ private final AtomicLong curInode;
|
|
|
+ private final FileSummary.Builder summary = FileSummary.newBuilder()
|
|
|
+ .setOndiskVersion(ONDISK_VERSION)
|
|
|
+ .setLayoutVersion(LAYOUT_VERSION);
|
|
|
+
|
|
|
+ private final String blockPoolID;
|
|
|
+
|
|
|
+ public static Options defaults() {
|
|
|
+ return new Options();
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public ImageWriter(Options opts) throws IOException {
|
|
|
+ final OutputStream out;
|
|
|
+ if (null == opts.outStream) {
|
|
|
+ FileSystem fs = opts.outdir.getFileSystem(opts.getConf());
|
|
|
+ outfs = (fs instanceof LocalFileSystem)
|
|
|
+ ? ((LocalFileSystem)fs).getRaw()
|
|
|
+ : fs;
|
|
|
+ Path tmp = opts.outdir;
|
|
|
+ if (!outfs.mkdirs(tmp)) {
|
|
|
+ throw new IOException("Failed to create output dir: " + tmp);
|
|
|
+ }
|
|
|
+ try (NNStorage stor = new NNStorage(opts.getConf(),
|
|
|
+ Arrays.asList(tmp.toUri()), Arrays.asList(tmp.toUri()))) {
|
|
|
+ NamespaceInfo info = NNStorage.newNamespaceInfo();
|
|
|
+ if (info.getLayoutVersion() != LAYOUT_VERSION) {
|
|
|
+ throw new IllegalStateException("Incompatible layout " +
|
|
|
+ info.getLayoutVersion() + " (expected " + LAYOUT_VERSION);
|
|
|
+ }
|
|
|
+ stor.format(info);
|
|
|
+ blockPoolID = info.getBlockPoolID();
|
|
|
+ }
|
|
|
+ outdir = new Path(tmp, "current");
|
|
|
+ out = outfs.create(new Path(outdir, "fsimage_0000000000000000000"));
|
|
|
+ } else {
|
|
|
+ // XXX necessary? writing a NNStorage now...
|
|
|
+ outdir = null;
|
|
|
+ outfs = null;
|
|
|
+ out = opts.outStream;
|
|
|
+ blockPoolID = "";
|
|
|
+ }
|
|
|
+ digest = MD5Hash.getDigester();
|
|
|
+ raw = new TrackedOutputStream<>(new DigestOutputStream(
|
|
|
+ new BufferedOutputStream(out), digest));
|
|
|
+ compress = opts.compress;
|
|
|
+ CompressionCodec codec = compress.getImageCodec();
|
|
|
+ if (codec != null) {
|
|
|
+ summary.setCodec(codec.getClass().getCanonicalName());
|
|
|
+ }
|
|
|
+ startBlock = opts.startBlock;
|
|
|
+ curBlock = startBlock;
|
|
|
+ startInode = opts.startInode;
|
|
|
+ curInode = new AtomicLong(startInode);
|
|
|
+ dircache = Collections.synchronizedMap(new DirEntryCache(opts.maxdircache));
|
|
|
+
|
|
|
+ ugis = null == opts.ugis
|
|
|
+ ? ReflectionUtils.newInstance(opts.ugisClass, opts.getConf())
|
|
|
+ : opts.ugis;
|
|
|
+ BlockFormat<FileRegion> fmt = null == opts.blocks
|
|
|
+ ? ReflectionUtils.newInstance(opts.blockFormatClass, opts.getConf())
|
|
|
+ : opts.blocks;
|
|
|
+ blocks = fmt.getWriter(null);
|
|
|
+ blockIds = null == opts.blockIds
|
|
|
+ ? ReflectionUtils.newInstance(opts.blockIdsClass, opts.getConf())
|
|
|
+ : opts.blockIds;
|
|
|
+
|
|
|
+ // create directory and inode sections as side-files.
|
|
|
+ // The details are written to files to avoid keeping them in memory.
|
|
|
+ dirsTmp = File.createTempFile("fsimg_dir", null);
|
|
|
+ dirsTmp.deleteOnExit();
|
|
|
+ dirs = beginSection(new FileOutputStream(dirsTmp));
|
|
|
+ try {
|
|
|
+ inodesTmp = File.createTempFile("fsimg_inode", null);
|
|
|
+ inodesTmp.deleteOnExit();
|
|
|
+ inodes = new FileOutputStream(inodesTmp);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // appropriate to close raw?
|
|
|
+ IOUtils.cleanup(null, raw, dirs);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+
|
|
|
+ raw.write(MAGIC_HEADER);
|
|
|
+ curSec = raw.pos;
|
|
|
+ assert raw.pos == MAGIC_HEADER.length;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void accept(TreePath e) throws IOException {
|
|
|
+ assert e.getParentId() < curInode.get();
|
|
|
+ // allocate ID
|
|
|
+ long id = curInode.getAndIncrement();
|
|
|
+ e.accept(id);
|
|
|
+ assert e.getId() < curInode.get();
|
|
|
+ INode n = e.toINode(ugis, blockIds, blocks, blockPoolID);
|
|
|
+ writeInode(n);
|
|
|
+
|
|
|
+ if (e.getParentId() > 0) {
|
|
|
+ // add DirEntry to map, which may page out entries
|
|
|
+ DirEntry.Builder de = DirEntry.newBuilder()
|
|
|
+ .setParent(e.getParentId())
|
|
|
+ .addChildren(e.getId());
|
|
|
+ dircache.put(e.getParentId(), de);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("serial")
|
|
|
+ class DirEntryCache extends LinkedHashMap<Long, DirEntry.Builder> {
|
|
|
+
|
|
|
+ // should cache path to root, not evict LRCached
|
|
|
+ private final int nEntries;
|
|
|
+
|
|
|
+ DirEntryCache(int nEntries) {
|
|
|
+ this.nEntries = nEntries;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DirEntry.Builder put(Long p, DirEntry.Builder b) {
|
|
|
+ DirEntry.Builder e = get(p);
|
|
|
+ if (null == e) {
|
|
|
+ return super.put(p, b);
|
|
|
+ }
|
|
|
+ //merge
|
|
|
+ e.addAllChildren(b.getChildrenList());
|
|
|
+ // not strictly conforming
|
|
|
+ return e;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected boolean removeEldestEntry(Entry<Long, DirEntry.Builder> be) {
|
|
|
+ if (size() > nEntries) {
|
|
|
+ DirEntry d = be.getValue().build();
|
|
|
+ try {
|
|
|
+ writeDirEntry(d);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void writeInode(INode n) throws IOException {
|
|
|
+ n.writeDelimitedTo(inodes);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void writeDirEntry(DirEntry e) throws IOException {
|
|
|
+ e.writeDelimitedTo(dirs);
|
|
|
+ }
|
|
|
+
|
|
|
+ // from FSImageFormatProtobuf... why not just read position from the stream?
|
|
|
+ private static int getOndiskSize(com.google.protobuf.GeneratedMessage s) {
|
|
|
+ return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
|
|
|
+ + s.getSerializedSize();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void close() throws IOException {
|
|
|
+ if (closed) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ for (DirEntry.Builder b : dircache.values()) {
|
|
|
+ DirEntry e = b.build();
|
|
|
+ writeDirEntry(e);
|
|
|
+ }
|
|
|
+ dircache.clear();
|
|
|
+
|
|
|
+ // close side files
|
|
|
+ IOUtils.cleanup(null, dirs, inodes, blocks);
|
|
|
+ if (null == dirs || null == inodes) {
|
|
|
+ // init failed
|
|
|
+ if (raw != null) {
|
|
|
+ raw.close();
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ writeNameSystemSection();
|
|
|
+ writeINodeSection();
|
|
|
+ writeDirSection();
|
|
|
+ writeStringTableSection();
|
|
|
+
|
|
|
+ // write summary directly to raw
|
|
|
+ FileSummary s = summary.build();
|
|
|
+ s.writeDelimitedTo(raw);
|
|
|
+ int length = getOndiskSize(s);
|
|
|
+ byte[] lengthBytes = new byte[4];
|
|
|
+ ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
|
|
|
+ raw.write(lengthBytes);
|
|
|
+ } finally {
|
|
|
+ raw.close();
|
|
|
+ }
|
|
|
+ writeMD5("fsimage_0000000000000000000");
|
|
|
+ closed = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Write checksum for image file. Pulled from MD5Utils/internals. Awkward to
|
|
|
+ * reuse existing tools/utils.
|
|
|
+ */
|
|
|
+ void writeMD5(String imagename) throws IOException {
|
|
|
+ if (null == outdir) {
|
|
|
+ //LOG.warn("Not writing MD5");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ MD5Hash md5 = new MD5Hash(digest.digest());
|
|
|
+ String digestString = StringUtils.byteToHexString(md5.getDigest());
|
|
|
+ Path chk = new Path(outdir, imagename + ".md5");
|
|
|
+ try (OutputStream out = outfs.create(chk)) {
|
|
|
+ String md5Line = digestString + " *" + imagename + "\n";
|
|
|
+ out.write(md5Line.getBytes(Charsets.UTF_8));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ OutputStream beginSection(OutputStream out) throws IOException {
|
|
|
+ CompressionCodec codec = compress.getImageCodec();
|
|
|
+ if (null == codec) {
|
|
|
+ return out;
|
|
|
+ }
|
|
|
+ return codec.createOutputStream(out);
|
|
|
+ }
|
|
|
+
|
|
|
+ void endSection(OutputStream out, SectionName name) throws IOException {
|
|
|
+ CompressionCodec codec = compress.getImageCodec();
|
|
|
+ if (codec != null) {
|
|
|
+ ((CompressorStream)out).finish();
|
|
|
+ }
|
|
|
+ out.flush();
|
|
|
+ long length = raw.pos - curSec;
|
|
|
+ summary.addSections(FileSummary.Section.newBuilder()
|
|
|
+ .setName(name.toString()) // not strictly correct, but name not visible
|
|
|
+ .setOffset(curSec).setLength(length));
|
|
|
+ curSec += length;
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeNameSystemSection() throws IOException {
|
|
|
+ NameSystemSection.Builder b = NameSystemSection.newBuilder()
|
|
|
+ .setGenstampV1(1000)
|
|
|
+ .setGenstampV1Limit(0)
|
|
|
+ .setGenstampV2(1001)
|
|
|
+ .setLastAllocatedBlockId(blockIds.lastId())
|
|
|
+ .setTransactionId(0);
|
|
|
+ NameSystemSection s = b.build();
|
|
|
+
|
|
|
+ OutputStream sec = beginSection(raw);
|
|
|
+ s.writeDelimitedTo(sec);
|
|
|
+ endSection(sec, SectionName.NS_INFO);
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeINodeSection() throws IOException {
|
|
|
+ // could reset dict to avoid compression cost in close
|
|
|
+ INodeSection.Builder b = INodeSection.newBuilder()
|
|
|
+ .setNumInodes(curInode.get() - startInode)
|
|
|
+ .setLastInodeId(curInode.get());
|
|
|
+ INodeSection s = b.build();
|
|
|
+
|
|
|
+ OutputStream sec = beginSection(raw);
|
|
|
+ s.writeDelimitedTo(sec);
|
|
|
+ // copy inodes
|
|
|
+ try (FileInputStream in = new FileInputStream(inodesTmp)) {
|
|
|
+ IOUtils.copyBytes(in, sec, 4096, false);
|
|
|
+ }
|
|
|
+ endSection(sec, SectionName.INODE);
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeDirSection() throws IOException {
|
|
|
+ // No header, so dirs can be written/compressed independently
|
|
|
+ //INodeDirectorySection.Builder b = INodeDirectorySection.newBuilder();
|
|
|
+ OutputStream sec = raw;
|
|
|
+ // copy dirs
|
|
|
+ try (FileInputStream in = new FileInputStream(dirsTmp)) {
|
|
|
+ IOUtils.copyBytes(in, sec, 4096, false);
|
|
|
+ }
|
|
|
+ endSection(sec, SectionName.INODE_DIR);
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeFilesUCSection() throws IOException {
|
|
|
+ FilesUnderConstructionSection.Builder b =
|
|
|
+ FilesUnderConstructionSection.newBuilder();
|
|
|
+ FilesUnderConstructionSection s = b.build();
|
|
|
+
|
|
|
+ OutputStream sec = beginSection(raw);
|
|
|
+ s.writeDelimitedTo(sec);
|
|
|
+ endSection(sec, SectionName.FILES_UNDERCONSTRUCTION);
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeSnapshotDiffSection() throws IOException {
|
|
|
+ SnapshotDiffSection.Builder b = SnapshotDiffSection.newBuilder();
|
|
|
+ SnapshotDiffSection s = b.build();
|
|
|
+
|
|
|
+ OutputStream sec = beginSection(raw);
|
|
|
+ s.writeDelimitedTo(sec);
|
|
|
+ endSection(sec, SectionName.SNAPSHOT_DIFF);
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeSecretManagerSection() throws IOException {
|
|
|
+ SecretManagerSection.Builder b = SecretManagerSection.newBuilder()
|
|
|
+ .setCurrentId(0)
|
|
|
+ .setTokenSequenceNumber(0);
|
|
|
+ SecretManagerSection s = b.build();
|
|
|
+
|
|
|
+ OutputStream sec = beginSection(raw);
|
|
|
+ s.writeDelimitedTo(sec);
|
|
|
+ endSection(sec, SectionName.SECRET_MANAGER);
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeCacheManagerSection() throws IOException {
|
|
|
+ CacheManagerSection.Builder b = CacheManagerSection.newBuilder()
|
|
|
+ .setNumPools(0)
|
|
|
+ .setNumDirectives(0)
|
|
|
+ .setNextDirectiveId(1);
|
|
|
+ CacheManagerSection s = b.build();
|
|
|
+
|
|
|
+ OutputStream sec = beginSection(raw);
|
|
|
+ s.writeDelimitedTo(sec);
|
|
|
+ endSection(sec, SectionName.CACHE_MANAGER);
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeStringTableSection() throws IOException {
|
|
|
+ StringTableSection.Builder b = StringTableSection.newBuilder();
|
|
|
+ Map<Integer, String> u = ugis.ugiMap();
|
|
|
+ b.setNumEntry(u.size());
|
|
|
+ StringTableSection s = b.build();
|
|
|
+
|
|
|
+ OutputStream sec = beginSection(raw);
|
|
|
+ s.writeDelimitedTo(sec);
|
|
|
+ for (Map.Entry<Integer, String> e : u.entrySet()) {
|
|
|
+ StringTableSection.Entry.Builder x =
|
|
|
+ StringTableSection.Entry.newBuilder()
|
|
|
+ .setId(e.getKey())
|
|
|
+ .setStr(e.getValue());
|
|
|
+ x.build().writeDelimitedTo(sec);
|
|
|
+ }
|
|
|
+ endSection(sec, SectionName.STRING_TABLE);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized String toString() {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("{ codec=\"").append(compress.getImageCodec());
|
|
|
+ sb.append("\", startBlock=").append(startBlock);
|
|
|
+ sb.append(", curBlock=").append(curBlock);
|
|
|
+ sb.append(", startInode=").append(startInode);
|
|
|
+ sb.append(", curInode=").append(curInode);
|
|
|
+ sb.append(", ugi=").append(ugis);
|
|
|
+ sb.append(", blockIds=").append(blockIds);
|
|
|
+ sb.append(", offset=").append(raw.pos);
|
|
|
+ sb.append(" }");
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ static class TrackedOutputStream<T extends OutputStream>
|
|
|
+ extends FilterOutputStream {
|
|
|
+
|
|
|
+ private long pos = 0L;
|
|
|
+
|
|
|
+ TrackedOutputStream(T out) {
|
|
|
+ super(out);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public T getInner() {
|
|
|
+ return (T) out;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(int b) throws IOException {
|
|
|
+ out.write(b);
|
|
|
+ ++pos;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(byte[] b) throws IOException {
|
|
|
+ write(b, 0, b.length);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(byte[] b, int off, int len) throws IOException {
|
|
|
+ out.write(b, off, len);
|
|
|
+ pos += len;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void flush() throws IOException {
|
|
|
+ super.flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ super.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Configurable options for image generation mapping pluggable components.
|
|
|
+ */
|
|
|
+ public static class Options implements Configurable {
|
|
|
+
|
|
|
+ public static final String START_INODE = "hdfs.image.writer.start.inode";
|
|
|
+ public static final String CACHE_ENTRY = "hdfs.image.writer.cache.entries";
|
|
|
+ public static final String UGI_CLASS = "hdfs.image.writer.ugi.class";
|
|
|
+ public static final String BLOCK_RESOLVER_CLASS =
|
|
|
+ "hdfs.image.writer.blockresolver.class";
|
|
|
+
|
|
|
+ private Path outdir;
|
|
|
+ private Configuration conf;
|
|
|
+ private OutputStream outStream;
|
|
|
+ private int maxdircache;
|
|
|
+ private long startBlock;
|
|
|
+ private long startInode;
|
|
|
+ private UGIResolver ugis;
|
|
|
+ private Class<? extends UGIResolver> ugisClass;
|
|
|
+ private BlockFormat<FileRegion> blocks;
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ private Class<? extends BlockFormat> blockFormatClass;
|
|
|
+ private BlockResolver blockIds;
|
|
|
+ private Class<? extends BlockResolver> blockIdsClass;
|
|
|
+ private FSImageCompression compress =
|
|
|
+ FSImageCompression.createNoopCompression();
|
|
|
+
|
|
|
+ protected Options() {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setConf(Configuration conf) {
|
|
|
+ this.conf = conf;
|
|
|
+ //long lastTxn = conf.getLong(LAST_TXN, 0L);
|
|
|
+ String def = new File("hdfs/name").toURI().toString();
|
|
|
+ outdir = new Path(conf.get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, def));
|
|
|
+ startBlock = conf.getLong(FixedBlockResolver.START_BLOCK, (1L << 30) + 1);
|
|
|
+ startInode = conf.getLong(START_INODE, (1L << 14) + 1);
|
|
|
+ maxdircache = conf.getInt(CACHE_ENTRY, 100);
|
|
|
+ ugisClass = conf.getClass(UGI_CLASS,
|
|
|
+ SingleUGIResolver.class, UGIResolver.class);
|
|
|
+ blockFormatClass = conf.getClass(
|
|
|
+ DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
|
|
|
+ NullBlockFormat.class, BlockFormat.class);
|
|
|
+ blockIdsClass = conf.getClass(BLOCK_RESOLVER_CLASS,
|
|
|
+ FixedBlockResolver.class, BlockResolver.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Configuration getConf() {
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options output(String out) {
|
|
|
+ this.outdir = new Path(out);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options outStream(OutputStream outStream) {
|
|
|
+ this.outStream = outStream;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options codec(String codec) throws IOException {
|
|
|
+ this.compress = FSImageCompression.createCompression(getConf(), codec);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options cache(int nDirEntries) {
|
|
|
+ this.maxdircache = nDirEntries;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options ugi(UGIResolver ugis) {
|
|
|
+ this.ugis = ugis;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options ugi(Class<? extends UGIResolver> ugisClass) {
|
|
|
+ this.ugisClass = ugisClass;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options blockIds(BlockResolver blockIds) {
|
|
|
+ this.blockIds = blockIds;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options blockIds(Class<? extends BlockResolver> blockIdsClass) {
|
|
|
+ this.blockIdsClass = blockIdsClass;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Options blocks(BlockFormat<FileRegion> blocks) {
|
|
|
+ this.blocks = blocks;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ public Options blocks(Class<? extends BlockFormat> blocksClass) {
|
|
|
+ this.blockFormatClass = blocksClass;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|