|
@@ -21,25 +21,17 @@ import java.io.BufferedInputStream;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
-import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.PrintStream;
|
|
|
import java.io.RandomAccessFile;
|
|
|
import java.io.UnsupportedEncodingException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
-import java.nio.channels.FileChannel;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
-import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -463,22 +455,20 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
return "/";
|
|
|
}
|
|
|
long parent = getFromDirChildMap(inode);
|
|
|
- byte[] bytes = dirMap.get(toBytes(parent));
|
|
|
- synchronized (this) {
|
|
|
- if (!dirPathCache.containsKey(parent)) {
|
|
|
- if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
|
|
|
- // The parent is an INodeReference, which is generated from snapshot.
|
|
|
- // For delimited oiv tool, no need to print out metadata in snapshots.
|
|
|
- throw PBImageTextWriter.createIgnoredSnapshotException(inode);
|
|
|
- }
|
|
|
- String parentName = toString(bytes);
|
|
|
- String parentPath =
|
|
|
- new Path(getParentPath(parent),
|
|
|
- parentName.isEmpty() ? "/" : parentName).toString();
|
|
|
- dirPathCache.put(parent, parentPath);
|
|
|
+ if (!dirPathCache.containsKey(parent)) {
|
|
|
+ byte[] bytes = dirMap.get(toBytes(parent));
|
|
|
+ if (parent != INodeId.ROOT_INODE_ID && bytes == null) {
|
|
|
+ // The parent is an INodeReference, which is generated from snapshot.
|
|
|
+ // For delimited oiv tool, no need to print out metadata in snapshots.
|
|
|
+ throw PBImageTextWriter.createIgnoredSnapshotException(inode);
|
|
|
}
|
|
|
- return dirPathCache.get(parent);
|
|
|
+ String parentName = toString(bytes);
|
|
|
+ String parentPath =
|
|
|
+ new Path(getParentPath(parent),
|
|
|
+ parentName.isEmpty() ? "/" : parentName).toString();
|
|
|
+ dirPathCache.put(parent, parentPath);
|
|
|
}
|
|
|
+ return dirPathCache.get(parent);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -503,12 +493,9 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
}
|
|
|
|
|
|
private SerialNumberManager.StringTable stringTable;
|
|
|
- private final PrintStream out;
|
|
|
+ private PrintStream out;
|
|
|
private MetadataMap metadataMap = null;
|
|
|
private String delimiter;
|
|
|
- private File filename;
|
|
|
- private int numThreads;
|
|
|
- private String parallelOutputFile;
|
|
|
|
|
|
/**
|
|
|
* Construct a PB FsImage writer to generate text file.
|
|
@@ -516,8 +503,8 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
* @param tempPath the path to store metadata. If it is empty, store metadata
|
|
|
* in memory instead.
|
|
|
*/
|
|
|
- PBImageTextWriter(PrintStream out, String delimiter, String tempPath,
|
|
|
- int numThreads, String parallelOutputFile) throws IOException {
|
|
|
+ PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
|
|
|
+ throws IOException {
|
|
|
this.out = out;
|
|
|
this.delimiter = delimiter;
|
|
|
if (tempPath.isEmpty()) {
|
|
@@ -525,17 +512,6 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
} else {
|
|
|
metadataMap = new LevelDBMetadataMap(tempPath);
|
|
|
}
|
|
|
- this.numThreads = numThreads;
|
|
|
- this.parallelOutputFile = parallelOutputFile;
|
|
|
- }
|
|
|
-
|
|
|
- PBImageTextWriter(PrintStream out, String delimiter, String tempPath)
|
|
|
- throws IOException {
|
|
|
- this(out, delimiter, tempPath, 1, "-");
|
|
|
- }
|
|
|
-
|
|
|
- protected PrintStream serialOutStream() {
|
|
|
- return out;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -586,9 +562,7 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
*/
|
|
|
abstract protected void afterOutput() throws IOException;
|
|
|
|
|
|
- public void visit(String filePath) throws IOException {
|
|
|
- filename = new File(filePath);
|
|
|
- RandomAccessFile file = new RandomAccessFile(filePath, "r");
|
|
|
+ public void visit(RandomAccessFile file) throws IOException {
|
|
|
Configuration conf = new Configuration();
|
|
|
if (!FSImageUtil.checkFileFormat(file)) {
|
|
|
throw new IOException("Unrecognized FSImage");
|
|
@@ -668,122 +642,21 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
private void output(Configuration conf, FileSummary summary,
|
|
|
FileInputStream fin, ArrayList<FileSummary.Section> sections)
|
|
|
throws IOException {
|
|
|
- ArrayList<FileSummary.Section> allINodeSubSections =
|
|
|
- getINodeSubSections(sections);
|
|
|
- if (numThreads > 1 && !parallelOutputFile.equals("-") &&
|
|
|
- allINodeSubSections.size() > 1) {
|
|
|
- outputInParallel(conf, summary, allINodeSubSections);
|
|
|
- } else {
|
|
|
- LOG.info("Serial output due to threads num: {}, parallel output file: {}, " +
|
|
|
- "subSections: {}.", numThreads, parallelOutputFile, allINodeSubSections.size());
|
|
|
- outputInSerial(conf, summary, fin, sections);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void outputInSerial(Configuration conf, FileSummary summary,
|
|
|
- FileInputStream fin, ArrayList<FileSummary.Section> sections)
|
|
|
- throws IOException {
|
|
|
InputStream is;
|
|
|
long startTime = Time.monotonicNow();
|
|
|
- serialOutStream().println(getHeader());
|
|
|
+ out.println(getHeader());
|
|
|
for (FileSummary.Section section : sections) {
|
|
|
if (SectionName.fromString(section.getName()) == SectionName.INODE) {
|
|
|
fin.getChannel().position(section.getOffset());
|
|
|
is = FSImageUtil.wrapInputStreamForCompression(conf,
|
|
|
summary.getCodec(), new BufferedInputStream(new LimitInputStream(
|
|
|
fin, section.getLength())));
|
|
|
- INodeSection s = INodeSection.parseDelimitedFrom(is);
|
|
|
- LOG.info("Found {} INodes in the INode section", s.getNumInodes());
|
|
|
- int count = outputINodes(is, serialOutStream());
|
|
|
- LOG.info("Outputted {} INodes.", count);
|
|
|
+ outputINodes(is);
|
|
|
}
|
|
|
}
|
|
|
afterOutput();
|
|
|
long timeTaken = Time.monotonicNow() - startTime;
|
|
|
- LOG.debug("Time to output inodes: {} ms", timeTaken);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * STEP1: Multi-threaded process sub-sections.
|
|
|
- * Given n (n>1) threads to process k (k>=n) sections,
|
|
|
- * output parsed results of each section to tmp file in order.
|
|
|
- * STEP2: Merge tmp files.
|
|
|
- */
|
|
|
- private void outputInParallel(Configuration conf, FileSummary summary,
|
|
|
- ArrayList<FileSummary.Section> subSections)
|
|
|
- throws IOException {
|
|
|
- int nThreads = Integer.min(numThreads, subSections.size());
|
|
|
- LOG.info("Outputting in parallel with {} sub-sections using {} threads",
|
|
|
- subSections.size(), nThreads);
|
|
|
- final CopyOnWriteArrayList<IOException> exceptions = new CopyOnWriteArrayList<>();
|
|
|
- CountDownLatch latch = new CountDownLatch(subSections.size());
|
|
|
- ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
|
|
|
- AtomicLong expectedINodes = new AtomicLong(0);
|
|
|
- AtomicLong totalParsed = new AtomicLong(0);
|
|
|
- String codec = summary.getCodec();
|
|
|
- String[] paths = new String[subSections.size()];
|
|
|
-
|
|
|
- for (int i = 0; i < subSections.size(); i++) {
|
|
|
- paths[i] = parallelOutputFile + ".tmp." + i;
|
|
|
- int index = i;
|
|
|
- executorService.submit(() -> {
|
|
|
- LOG.info("Output iNodes of section-{}", index);
|
|
|
- InputStream is = null;
|
|
|
- try (PrintStream outStream = new PrintStream(paths[index], "UTF-8")) {
|
|
|
- long startTime = Time.monotonicNow();
|
|
|
- is = getInputStreamForSection(subSections.get(index), codec, conf);
|
|
|
- if (index == 0) {
|
|
|
- // The first iNode section has a header which must be processed first
|
|
|
- INodeSection s = INodeSection.parseDelimitedFrom(is);
|
|
|
- expectedINodes.set(s.getNumInodes());
|
|
|
- }
|
|
|
- totalParsed.addAndGet(outputINodes(is, outStream));
|
|
|
- long timeTaken = Time.monotonicNow() - startTime;
|
|
|
- LOG.info("Time to output iNodes of section-{}: {} ms", index, timeTaken);
|
|
|
- } catch (Exception e) {
|
|
|
- exceptions.add(new IOException(e));
|
|
|
- } finally {
|
|
|
- latch.countDown();
|
|
|
- try {
|
|
|
- if (is != null) {
|
|
|
- is.close();
|
|
|
- }
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.warn("Failed to close the input stream, ignoring", ioe);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- latch.await();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.error("Interrupted waiting for countdown latch", e);
|
|
|
- throw new IOException(e);
|
|
|
- }
|
|
|
-
|
|
|
- executorService.shutdown();
|
|
|
- if (exceptions.size() != 0) {
|
|
|
- LOG.error("Failed to output INode sub-sections, {} exception(s) occurred.",
|
|
|
- exceptions.size());
|
|
|
- throw exceptions.get(0);
|
|
|
- }
|
|
|
- if (totalParsed.get() != expectedINodes.get()) {
|
|
|
- throw new IOException("Expected to parse " + expectedINodes + " in parallel, " +
|
|
|
- "but parsed " + totalParsed.get() + ". The image may be corrupt.");
|
|
|
- }
|
|
|
- LOG.info("Completed outputting all INode sub-sections to {} tmp files.",
|
|
|
- subSections.size());
|
|
|
-
|
|
|
- try (PrintStream ps = new PrintStream(parallelOutputFile, "UTF-8")) {
|
|
|
- ps.println(getHeader());
|
|
|
- }
|
|
|
-
|
|
|
- // merge tmp files
|
|
|
- long startTime = Time.monotonicNow();
|
|
|
- mergeFiles(paths, parallelOutputFile);
|
|
|
- long timeTaken = Time.monotonicNow() - startTime;
|
|
|
- LOG.info("Completed all stages. Time to merge files: {} ms", timeTaken);
|
|
|
+ LOG.debug("Time to output inodes: {}ms", timeTaken);
|
|
|
}
|
|
|
|
|
|
protected PermissionStatus getPermission(long perm) {
|
|
@@ -890,27 +763,22 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
LOG.info("Scanned {} INode directories to build namespace.", count);
|
|
|
}
|
|
|
|
|
|
- void printIfNotEmpty(PrintStream outStream, String line) {
|
|
|
+ void printIfNotEmpty(String line) {
|
|
|
if (!line.isEmpty()) {
|
|
|
- outStream.println(line);
|
|
|
+ out.println(line);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private int outputINodes(InputStream in, PrintStream outStream)
|
|
|
- throws IOException {
|
|
|
+ private void outputINodes(InputStream in) throws IOException {
|
|
|
+ INodeSection s = INodeSection.parseDelimitedFrom(in);
|
|
|
+ LOG.info("Found {} INodes in the INode section", s.getNumInodes());
|
|
|
long ignored = 0;
|
|
|
long ignoredSnapshots = 0;
|
|
|
- // As the input stream is a LimitInputStream, the reading will stop when
|
|
|
- // EOF is encountered at the end of the stream.
|
|
|
- int count = 0;
|
|
|
- while (true) {
|
|
|
+ for (int i = 0; i < s.getNumInodes(); ++i) {
|
|
|
INode p = INode.parseDelimitedFrom(in);
|
|
|
- if (p == null) {
|
|
|
- break;
|
|
|
- }
|
|
|
try {
|
|
|
String parentPath = metadataMap.getParentPath(p.getId());
|
|
|
- printIfNotEmpty(outStream, getEntry(parentPath, p));
|
|
|
+ printIfNotEmpty(getEntry(parentPath, p));
|
|
|
} catch (IOException ioe) {
|
|
|
ignored++;
|
|
|
if (!(ioe instanceof IgnoreSnapshotException)) {
|
|
@@ -922,16 +790,16 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- count++;
|
|
|
- if (LOG.isDebugEnabled() && count % 100000 == 0) {
|
|
|
- LOG.debug("Outputted {} INodes.", count);
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled() && i % 100000 == 0) {
|
|
|
+ LOG.debug("Outputted {} INodes.", i);
|
|
|
}
|
|
|
}
|
|
|
if (ignored > 0) {
|
|
|
LOG.warn("Ignored {} nodes, including {} in snapshots. Please turn on"
|
|
|
+ " debug log for details", ignored, ignoredSnapshots);
|
|
|
}
|
|
|
- return count;
|
|
|
+ LOG.info("Outputted {} INodes.", s.getNumInodes());
|
|
|
}
|
|
|
|
|
|
private static IgnoreSnapshotException createIgnoredSnapshotException(
|
|
@@ -954,79 +822,4 @@ abstract class PBImageTextWriter implements Closeable {
|
|
|
}
|
|
|
return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
|
|
}
|
|
|
-
|
|
|
- private ArrayList<FileSummary.Section> getINodeSubSections(
|
|
|
- ArrayList<FileSummary.Section> sections) {
|
|
|
- ArrayList<FileSummary.Section> subSections = new ArrayList<>();
|
|
|
- Iterator<FileSummary.Section> iter = sections.iterator();
|
|
|
- while (iter.hasNext()) {
|
|
|
- FileSummary.Section s = iter.next();
|
|
|
- if (SectionName.fromString(s.getName()) == SectionName.INODE_SUB) {
|
|
|
- subSections.add(s);
|
|
|
- }
|
|
|
- }
|
|
|
- return subSections;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Given a FSImage FileSummary.section, return a LimitInput stream set to
|
|
|
- * the starting position of the section and limited to the section length.
|
|
|
- * @param section The FileSummary.Section containing the offset and length
|
|
|
- * @param compressionCodec The compression codec in use, if any
|
|
|
- * @return An InputStream for the given section
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private InputStream getInputStreamForSection(FileSummary.Section section,
|
|
|
- String compressionCodec, Configuration conf)
|
|
|
- throws IOException {
|
|
|
- // channel of RandomAccessFile is not thread safe, use File
|
|
|
- FileInputStream fin = new FileInputStream(filename);
|
|
|
- try {
|
|
|
- FileChannel channel = fin.getChannel();
|
|
|
- channel.position(section.getOffset());
|
|
|
- InputStream in = new BufferedInputStream(new LimitInputStream(fin,
|
|
|
- section.getLength()));
|
|
|
-
|
|
|
- in = FSImageUtil.wrapInputStreamForCompression(conf,
|
|
|
- compressionCodec, in);
|
|
|
- return in;
|
|
|
- } catch (IOException e) {
|
|
|
- fin.close();
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @param srcPaths Source files of contents to be merged
|
|
|
- * @param resultPath Merged file path
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public static void mergeFiles(String[] srcPaths, String resultPath)
|
|
|
- throws IOException {
|
|
|
- if (srcPaths == null || srcPaths.length < 1) {
|
|
|
- LOG.warn("no source files to merge.");
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- File[] files = new File[srcPaths.length];
|
|
|
- for (int i = 0; i < srcPaths.length; i++) {
|
|
|
- files[i] = new File(srcPaths[i]);
|
|
|
- }
|
|
|
-
|
|
|
- File resultFile = new File(resultPath);
|
|
|
- try (FileChannel resultChannel =
|
|
|
- new FileOutputStream(resultFile, true).getChannel()) {
|
|
|
- for (File file : files) {
|
|
|
- try (FileChannel src = new FileInputStream(file).getChannel()) {
|
|
|
- resultChannel.transferFrom(src, resultChannel.size(), src.size());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- for (File file : files) {
|
|
|
- if (!file.delete() && file.exists()) {
|
|
|
- LOG.warn("delete tmp file: {} returned false", file);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|