|
@@ -0,0 +1,446 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.fs;
|
|
|
+
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.io.DataInputBuffer;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.util.Progressable;
|
|
|
+
|
|
|
+/** An implementation of the in-memory filesystem. This implementation assumes
|
|
|
+ * that the file lengths are known ahead of time and the total lengths of all
|
|
|
+ * the files is below a certain number (like 100 MB, configurable). Use the API
|
|
|
+ * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
|
|
|
+ * the API for reserving space in the FS. The uri of this filesystem starts with
|
|
|
+ * ramfs:// .
|
|
|
+ */
|
|
|
+@Deprecated
|
|
|
+public class InMemoryFileSystem extends ChecksumFileSystem {
|
|
|
+ private static class RawInMemoryFileSystem extends FileSystem {
|
|
|
+ private URI uri;
|
|
|
+ private long fsSize;
|
|
|
+ private volatile long totalUsed;
|
|
|
+ private Path staticWorkingDir;
|
|
|
+
|
|
|
+ //pathToFileAttribs is the final place where a file is put after it is closed
|
|
|
+ private Map<String, FileAttributes> pathToFileAttribs =
|
|
|
+ new HashMap<String, FileAttributes>();
|
|
|
+
|
|
|
+ //tempFileAttribs is a temp place which is updated while reserving memory for
|
|
|
+ //files we are going to create. It is read in the createRaw method and the
|
|
|
+ //temp key/value is discarded. If the file makes it to "close", then it
|
|
|
+ //ends up being in the pathToFileAttribs map.
|
|
|
+ private Map<String, FileAttributes> tempFileAttribs =
|
|
|
+ new HashMap<String, FileAttributes>();
|
|
|
+
|
|
|
+ public RawInMemoryFileSystem() {
|
|
|
+ setConf(new Configuration());
|
|
|
+ }
|
|
|
+
|
|
|
+ public RawInMemoryFileSystem(URI uri, Configuration conf) {
|
|
|
+ initialize(uri, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ //inherit javadoc
|
|
|
+ public void initialize(URI uri, Configuration conf) {
|
|
|
+ setConf(conf);
|
|
|
+ int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
|
|
|
+ this.fsSize = size * 1024L * 1024L;
|
|
|
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
|
|
+ String path = this.uri.getPath();
|
|
|
+ if (path.length() == 0) {
|
|
|
+ path = Path.CUR_DIR;
|
|
|
+ }
|
|
|
+ this.staticWorkingDir = new Path(path);
|
|
|
+ LOG.info("Initialized InMemoryFileSystem: " + uri.toString() +
|
|
|
+ " of size (in bytes): " + fsSize);
|
|
|
+ }
|
|
|
+
|
|
|
+ //inherit javadoc
|
|
|
+ public URI getUri() {
|
|
|
+ return uri;
|
|
|
+ }
|
|
|
+
|
|
|
+ private class InMemoryInputStream extends FSInputStream {
|
|
|
+ private DataInputBuffer din = new DataInputBuffer();
|
|
|
+ private FileAttributes fAttr;
|
|
|
+
|
|
|
+ public InMemoryInputStream(Path f) throws IOException {
|
|
|
+ synchronized (RawInMemoryFileSystem.this) {
|
|
|
+ fAttr = pathToFileAttribs.get(getPath(f));
|
|
|
+ if (fAttr == null) {
|
|
|
+ throw new FileNotFoundException("File " + f + " does not exist");
|
|
|
+ }
|
|
|
+ din.reset(fAttr.data, 0, fAttr.size);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getPos() throws IOException {
|
|
|
+ return din.getPosition();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void seek(long pos) throws IOException {
|
|
|
+ if ((int)pos > fAttr.size)
|
|
|
+ throw new IOException("Cannot seek after EOF");
|
|
|
+ din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int available() throws IOException {
|
|
|
+ return din.available();
|
|
|
+ }
|
|
|
+ public boolean markSupport() { return false; }
|
|
|
+
|
|
|
+ public int read() throws IOException {
|
|
|
+ return din.read();
|
|
|
+ }
|
|
|
+
|
|
|
+ public int read(byte[] b, int off, int len) throws IOException {
|
|
|
+ return din.read(b, off, len);
|
|
|
+ }
|
|
|
+
|
|
|
+ public long skip(long n) throws IOException { return din.skip(n); }
|
|
|
+ }
|
|
|
+
|
|
|
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
|
|
+ return new FSDataInputStream(new InMemoryInputStream(f));
|
|
|
+ }
|
|
|
+
|
|
|
+ private class InMemoryOutputStream extends OutputStream {
|
|
|
+ private int count;
|
|
|
+ private FileAttributes fAttr;
|
|
|
+ private Path f;
|
|
|
+
|
|
|
+ public InMemoryOutputStream(Path f, FileAttributes fAttr)
|
|
|
+ throws IOException {
|
|
|
+ this.fAttr = fAttr;
|
|
|
+ this.f = f;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getPos() throws IOException {
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void close() throws IOException {
|
|
|
+ synchronized (RawInMemoryFileSystem.this) {
|
|
|
+ pathToFileAttribs.put(getPath(f), fAttr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void write(byte[] b, int off, int len) throws IOException {
|
|
|
+ if ((off < 0) || (off > b.length) || (len < 0) ||
|
|
|
+ ((off + len) > b.length) || ((off + len) < 0)) {
|
|
|
+ throw new IndexOutOfBoundsException();
|
|
|
+ } else if (len == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int newcount = count + len;
|
|
|
+ if (newcount > fAttr.size) {
|
|
|
+ throw new IOException("Insufficient space");
|
|
|
+ }
|
|
|
+ System.arraycopy(b, off, fAttr.data, count, len);
|
|
|
+ count = newcount;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void write(int b) throws IOException {
|
|
|
+ int newcount = count + 1;
|
|
|
+ if (newcount > fAttr.size) {
|
|
|
+ throw new IOException("Insufficient space");
|
|
|
+ }
|
|
|
+ fAttr.data[count] = (byte)b;
|
|
|
+ count = newcount;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** This optional operation is not yet supported. */
|
|
|
+ public FSDataOutputStream append(Path f, int bufferSize,
|
|
|
+ Progressable progress) throws IOException {
|
|
|
+ throw new IOException("Not supported");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param permission Currently ignored.
|
|
|
+ */
|
|
|
+ public FSDataOutputStream create(Path f, FsPermission permission,
|
|
|
+ boolean overwrite, int bufferSize,
|
|
|
+ short replication, long blockSize, Progressable progress)
|
|
|
+ throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ if (exists(f) && !overwrite) {
|
|
|
+ throw new IOException("File already exists:"+f);
|
|
|
+ }
|
|
|
+ FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
|
|
|
+ if (fAttr != null)
|
|
|
+ return create(f, fAttr);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public FSDataOutputStream create(Path f, FileAttributes fAttr)
|
|
|
+ throws IOException {
|
|
|
+ // the path is not added into the filesystem (in the pathToFileAttribs
|
|
|
+ // map) until close is called on the outputstream that this method is
|
|
|
+ // going to return
|
|
|
+ // Create an output stream out of data byte array
|
|
|
+ return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
|
|
|
+ statistics);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void close() throws IOException {
|
|
|
+ super.close();
|
|
|
+ synchronized (this) {
|
|
|
+ if (pathToFileAttribs != null) {
|
|
|
+ pathToFileAttribs.clear();
|
|
|
+ }
|
|
|
+ pathToFileAttribs = null;
|
|
|
+ if (tempFileAttribs != null) {
|
|
|
+ tempFileAttribs.clear();
|
|
|
+ }
|
|
|
+ tempFileAttribs = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean setReplication(Path src, short replication)
|
|
|
+ throws IOException {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean rename(Path src, Path dst) throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ if (exists(dst)) {
|
|
|
+ throw new IOException ("Path " + dst + " already exists");
|
|
|
+ }
|
|
|
+ FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
|
|
|
+ if (fAttr == null) return false;
|
|
|
+ pathToFileAttribs.put(getPath(dst), fAttr);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Deprecated
|
|
|
+ public boolean delete(Path f) throws IOException {
|
|
|
+ return delete(f, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
|
|
|
+ if (fAttr != null) {
|
|
|
+ fAttr.data = null;
|
|
|
+ totalUsed -= fAttr.size;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Directory operations are not supported
|
|
|
+ */
|
|
|
+ public FileStatus[] listStatus(Path f) throws IOException {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setWorkingDirectory(Path new_dir) {
|
|
|
+ staticWorkingDir = new_dir;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Path getWorkingDirectory() {
|
|
|
+ return staticWorkingDir;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param permission Currently ignored.
|
|
|
+ */
|
|
|
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FileStatus getFileStatus(Path f) throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ FileAttributes attr = pathToFileAttribs.get(getPath(f));
|
|
|
+ if (attr==null) {
|
|
|
+ throw new FileNotFoundException("File " + f + " does not exist.");
|
|
|
+ }
|
|
|
+ return new InMemoryFileStatus(f.makeQualified(this), attr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Some APIs exclusively for InMemoryFileSystem */
|
|
|
+
|
|
|
+ /** Register a path with its size. */
|
|
|
+ public boolean reserveSpace(Path f, long size) {
|
|
|
+ synchronized (this) {
|
|
|
+ if (!canFitInMemory(size))
|
|
|
+ return false;
|
|
|
+ FileAttributes fileAttr;
|
|
|
+ try {
|
|
|
+ fileAttr = new FileAttributes((int)size);
|
|
|
+ } catch (OutOfMemoryError o) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ totalUsed += size;
|
|
|
+ tempFileAttribs.put(getPath(f), fileAttr);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public void unreserveSpace(Path f) {
|
|
|
+ synchronized (this) {
|
|
|
+ FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
|
|
|
+ if (fAttr != null) {
|
|
|
+ fAttr.data = null;
|
|
|
+ totalUsed -= fAttr.size;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** This API getClosedFiles could have been implemented over listPathsRaw
|
|
|
+ * but it is an overhead to maintain directory structures for this impl of
|
|
|
+ * the in-memory fs.
|
|
|
+ */
|
|
|
+ public Path[] getFiles(PathFilter filter) {
|
|
|
+ synchronized (this) {
|
|
|
+ List<String> closedFilesList = new ArrayList<String>();
|
|
|
+ synchronized (pathToFileAttribs) {
|
|
|
+ Set paths = pathToFileAttribs.keySet();
|
|
|
+ if (paths == null || paths.isEmpty()) {
|
|
|
+ return new Path[0];
|
|
|
+ }
|
|
|
+ Iterator iter = paths.iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ String f = (String)iter.next();
|
|
|
+ if (filter.accept(new Path(f))) {
|
|
|
+ closedFilesList.add(f);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ String [] names =
|
|
|
+ closedFilesList.toArray(new String[closedFilesList.size()]);
|
|
|
+ Path [] results = new Path[names.length];
|
|
|
+ for (int i = 0; i < names.length; i++) {
|
|
|
+ results[i] = new Path(names[i]);
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getNumFiles(PathFilter filter) {
|
|
|
+ return getFiles(filter).length;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getFSSize() {
|
|
|
+ return fsSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ public float getPercentUsed() {
|
|
|
+ if (fsSize > 0)
|
|
|
+ return (float)totalUsed/fsSize;
|
|
|
+ else return 0.1f;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @TODO: Fix for Java6?
|
|
|
+ * As of Java5 it is safe to assume that if the file can fit
|
|
|
+ * in-memory then its file-size is less than Integer.MAX_VALUE.
|
|
|
+ */
|
|
|
+ private boolean canFitInMemory(long size) {
|
|
|
+ if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getPath(Path f) {
|
|
|
+ return f.toUri().getPath();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class FileAttributes {
|
|
|
+ private byte[] data;
|
|
|
+ private int size;
|
|
|
+
|
|
|
+ public FileAttributes(int size) {
|
|
|
+ this.size = size;
|
|
|
+ this.data = new byte[size];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class InMemoryFileStatus extends FileStatus {
|
|
|
+ InMemoryFileStatus(Path f, FileAttributes attr) throws IOException {
|
|
|
+ super(attr.size, false, 1, getDefaultBlockSize(), 0, f);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public InMemoryFileSystem() {
|
|
|
+ super(new RawInMemoryFileSystem());
|
|
|
+ }
|
|
|
+
|
|
|
+ public InMemoryFileSystem(URI uri, Configuration conf) {
|
|
|
+ super(new RawInMemoryFileSystem(uri, conf));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Register a file with its size. This will also register a checksum for the
|
|
|
+ * file that the user is trying to create. This is required since none of
|
|
|
+ * the FileSystem APIs accept the size of the file as argument. But since it
|
|
|
+ * is required for us to apriori know the size of the file we are going to
|
|
|
+ * create, the user must call this method for each file he wants to create
|
|
|
+ * and reserve memory for that file. We either succeed in reserving memory
|
|
|
+ * for both the main file and the checksum file and return true, or return
|
|
|
+ * false.
|
|
|
+ */
|
|
|
+ public boolean reserveSpaceWithCheckSum(Path f, long size) {
|
|
|
+ RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
|
|
|
+ synchronized(mfs) {
|
|
|
+ boolean b = mfs.reserveSpace(f, size);
|
|
|
+ if (b) {
|
|
|
+ long checksumSize = getChecksumFileLength(f, size);
|
|
|
+ b = mfs.reserveSpace(getChecksumFile(f), checksumSize);
|
|
|
+ if (!b) {
|
|
|
+ mfs.unreserveSpace(f);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return b;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Path[] getFiles(PathFilter filter) {
|
|
|
+ return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getNumFiles(PathFilter filter) {
|
|
|
+ return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getFSSize() {
|
|
|
+ return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
|
|
|
+ }
|
|
|
+
|
|
|
+ public float getPercentUsed() {
|
|
|
+ return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
|
|
|
+ }
|
|
|
+}
|