|
@@ -1,446 +0,0 @@
|
|
-/**
|
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
|
- * distributed with this work for additional information
|
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
|
- *
|
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
- *
|
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
|
- * limitations under the License.
|
|
|
|
- */
|
|
|
|
-package org.apache.hadoop.fs;
|
|
|
|
-
|
|
|
|
-import java.io.FileNotFoundException;
|
|
|
|
-import java.io.IOException;
|
|
|
|
-import java.io.OutputStream;
|
|
|
|
-import java.net.URI;
|
|
|
|
-import java.util.*;
|
|
|
|
-
|
|
|
|
-import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
|
-import org.apache.hadoop.io.DataInputBuffer;
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
|
-import org.apache.hadoop.util.Progressable;
|
|
|
|
-
|
|
|
|
-/** An implementation of the in-memory filesystem. This implementation assumes
|
|
|
|
- * that the file lengths are known ahead of time and the total lengths of all
|
|
|
|
- * the files is below a certain number (like 100 MB, configurable). Use the API
|
|
|
|
- * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
|
|
|
|
- * the API for reserving space in the FS. The uri of this filesystem starts with
|
|
|
|
- * ramfs:// .
|
|
|
|
- */
|
|
|
|
-@Deprecated
|
|
|
|
-public class InMemoryFileSystem extends ChecksumFileSystem {
|
|
|
|
- private static class RawInMemoryFileSystem extends FileSystem {
|
|
|
|
- private URI uri;
|
|
|
|
- private long fsSize;
|
|
|
|
- private volatile long totalUsed;
|
|
|
|
- private Path staticWorkingDir;
|
|
|
|
-
|
|
|
|
- //pathToFileAttribs is the final place where a file is put after it is closed
|
|
|
|
- private Map<String, FileAttributes> pathToFileAttribs =
|
|
|
|
- new HashMap<String, FileAttributes>();
|
|
|
|
-
|
|
|
|
- //tempFileAttribs is a temp place which is updated while reserving memory for
|
|
|
|
- //files we are going to create. It is read in the createRaw method and the
|
|
|
|
- //temp key/value is discarded. If the file makes it to "close", then it
|
|
|
|
- //ends up being in the pathToFileAttribs map.
|
|
|
|
- private Map<String, FileAttributes> tempFileAttribs =
|
|
|
|
- new HashMap<String, FileAttributes>();
|
|
|
|
-
|
|
|
|
- public RawInMemoryFileSystem() {
|
|
|
|
- setConf(new Configuration());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public RawInMemoryFileSystem(URI uri, Configuration conf) {
|
|
|
|
- initialize(uri, conf);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //inherit javadoc
|
|
|
|
- public void initialize(URI uri, Configuration conf) {
|
|
|
|
- setConf(conf);
|
|
|
|
- int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
|
|
|
|
- this.fsSize = size * 1024L * 1024L;
|
|
|
|
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
|
|
|
- String path = this.uri.getPath();
|
|
|
|
- if (path.length() == 0) {
|
|
|
|
- path = Path.CUR_DIR;
|
|
|
|
- }
|
|
|
|
- this.staticWorkingDir = new Path(path);
|
|
|
|
- LOG.info("Initialized InMemoryFileSystem: " + uri.toString() +
|
|
|
|
- " of size (in bytes): " + fsSize);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //inherit javadoc
|
|
|
|
- public URI getUri() {
|
|
|
|
- return uri;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private class InMemoryInputStream extends FSInputStream {
|
|
|
|
- private DataInputBuffer din = new DataInputBuffer();
|
|
|
|
- private FileAttributes fAttr;
|
|
|
|
-
|
|
|
|
- public InMemoryInputStream(Path f) throws IOException {
|
|
|
|
- synchronized (RawInMemoryFileSystem.this) {
|
|
|
|
- fAttr = pathToFileAttribs.get(getPath(f));
|
|
|
|
- if (fAttr == null) {
|
|
|
|
- throw new FileNotFoundException("File " + f + " does not exist");
|
|
|
|
- }
|
|
|
|
- din.reset(fAttr.data, 0, fAttr.size);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public long getPos() throws IOException {
|
|
|
|
- return din.getPosition();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void seek(long pos) throws IOException {
|
|
|
|
- if ((int)pos > fAttr.size)
|
|
|
|
- throw new IOException("Cannot seek after EOF");
|
|
|
|
- din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public int available() throws IOException {
|
|
|
|
- return din.available();
|
|
|
|
- }
|
|
|
|
- public boolean markSupport() { return false; }
|
|
|
|
-
|
|
|
|
- public int read() throws IOException {
|
|
|
|
- return din.read();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public int read(byte[] b, int off, int len) throws IOException {
|
|
|
|
- return din.read(b, off, len);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public long skip(long n) throws IOException { return din.skip(n); }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
|
|
|
- return new FSDataInputStream(new InMemoryInputStream(f));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private class InMemoryOutputStream extends OutputStream {
|
|
|
|
- private int count;
|
|
|
|
- private FileAttributes fAttr;
|
|
|
|
- private Path f;
|
|
|
|
-
|
|
|
|
- public InMemoryOutputStream(Path f, FileAttributes fAttr)
|
|
|
|
- throws IOException {
|
|
|
|
- this.fAttr = fAttr;
|
|
|
|
- this.f = f;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public long getPos() throws IOException {
|
|
|
|
- return count;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void close() throws IOException {
|
|
|
|
- synchronized (RawInMemoryFileSystem.this) {
|
|
|
|
- pathToFileAttribs.put(getPath(f), fAttr);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void write(byte[] b, int off, int len) throws IOException {
|
|
|
|
- if ((off < 0) || (off > b.length) || (len < 0) ||
|
|
|
|
- ((off + len) > b.length) || ((off + len) < 0)) {
|
|
|
|
- throw new IndexOutOfBoundsException();
|
|
|
|
- } else if (len == 0) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- int newcount = count + len;
|
|
|
|
- if (newcount > fAttr.size) {
|
|
|
|
- throw new IOException("Insufficient space");
|
|
|
|
- }
|
|
|
|
- System.arraycopy(b, off, fAttr.data, count, len);
|
|
|
|
- count = newcount;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void write(int b) throws IOException {
|
|
|
|
- int newcount = count + 1;
|
|
|
|
- if (newcount > fAttr.size) {
|
|
|
|
- throw new IOException("Insufficient space");
|
|
|
|
- }
|
|
|
|
- fAttr.data[count] = (byte)b;
|
|
|
|
- count = newcount;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** This optional operation is not yet supported. */
|
|
|
|
- public FSDataOutputStream append(Path f, int bufferSize,
|
|
|
|
- Progressable progress) throws IOException {
|
|
|
|
- throw new IOException("Not supported");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @param permission Currently ignored.
|
|
|
|
- */
|
|
|
|
- public FSDataOutputStream create(Path f, FsPermission permission,
|
|
|
|
- boolean overwrite, int bufferSize,
|
|
|
|
- short replication, long blockSize, Progressable progress)
|
|
|
|
- throws IOException {
|
|
|
|
- synchronized (this) {
|
|
|
|
- if (exists(f) && !overwrite) {
|
|
|
|
- throw new IOException("File already exists:"+f);
|
|
|
|
- }
|
|
|
|
- FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
|
|
|
|
- if (fAttr != null)
|
|
|
|
- return create(f, fAttr);
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public FSDataOutputStream create(Path f, FileAttributes fAttr)
|
|
|
|
- throws IOException {
|
|
|
|
- // the path is not added into the filesystem (in the pathToFileAttribs
|
|
|
|
- // map) until close is called on the outputstream that this method is
|
|
|
|
- // going to return
|
|
|
|
- // Create an output stream out of data byte array
|
|
|
|
- return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
|
|
|
|
- statistics);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void close() throws IOException {
|
|
|
|
- super.close();
|
|
|
|
- synchronized (this) {
|
|
|
|
- if (pathToFileAttribs != null) {
|
|
|
|
- pathToFileAttribs.clear();
|
|
|
|
- }
|
|
|
|
- pathToFileAttribs = null;
|
|
|
|
- if (tempFileAttribs != null) {
|
|
|
|
- tempFileAttribs.clear();
|
|
|
|
- }
|
|
|
|
- tempFileAttribs = null;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public boolean setReplication(Path src, short replication)
|
|
|
|
- throws IOException {
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public boolean rename(Path src, Path dst) throws IOException {
|
|
|
|
- synchronized (this) {
|
|
|
|
- if (exists(dst)) {
|
|
|
|
- throw new IOException ("Path " + dst + " already exists");
|
|
|
|
- }
|
|
|
|
- FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
|
|
|
|
- if (fAttr == null) return false;
|
|
|
|
- pathToFileAttribs.put(getPath(dst), fAttr);
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Deprecated
|
|
|
|
- public boolean delete(Path f) throws IOException {
|
|
|
|
- return delete(f, true);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
|
- synchronized (this) {
|
|
|
|
- FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
|
|
|
|
- if (fAttr != null) {
|
|
|
|
- fAttr.data = null;
|
|
|
|
- totalUsed -= fAttr.size;
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Directory operations are not supported
|
|
|
|
- */
|
|
|
|
- public FileStatus[] listStatus(Path f) throws IOException {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void setWorkingDirectory(Path new_dir) {
|
|
|
|
- staticWorkingDir = new_dir;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public Path getWorkingDirectory() {
|
|
|
|
- return staticWorkingDir;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @param permission Currently ignored.
|
|
|
|
- */
|
|
|
|
- public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public FileStatus getFileStatus(Path f) throws IOException {
|
|
|
|
- synchronized (this) {
|
|
|
|
- FileAttributes attr = pathToFileAttribs.get(getPath(f));
|
|
|
|
- if (attr==null) {
|
|
|
|
- throw new FileNotFoundException("File " + f + " does not exist.");
|
|
|
|
- }
|
|
|
|
- return new InMemoryFileStatus(f.makeQualified(this), attr);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** Some APIs exclusively for InMemoryFileSystem */
|
|
|
|
-
|
|
|
|
- /** Register a path with its size. */
|
|
|
|
- public boolean reserveSpace(Path f, long size) {
|
|
|
|
- synchronized (this) {
|
|
|
|
- if (!canFitInMemory(size))
|
|
|
|
- return false;
|
|
|
|
- FileAttributes fileAttr;
|
|
|
|
- try {
|
|
|
|
- fileAttr = new FileAttributes((int)size);
|
|
|
|
- } catch (OutOfMemoryError o) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- totalUsed += size;
|
|
|
|
- tempFileAttribs.put(getPath(f), fileAttr);
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- public void unreserveSpace(Path f) {
|
|
|
|
- synchronized (this) {
|
|
|
|
- FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
|
|
|
|
- if (fAttr != null) {
|
|
|
|
- fAttr.data = null;
|
|
|
|
- totalUsed -= fAttr.size;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** This API getClosedFiles could have been implemented over listPathsRaw
|
|
|
|
- * but it is an overhead to maintain directory structures for this impl of
|
|
|
|
- * the in-memory fs.
|
|
|
|
- */
|
|
|
|
- public Path[] getFiles(PathFilter filter) {
|
|
|
|
- synchronized (this) {
|
|
|
|
- List<String> closedFilesList = new ArrayList<String>();
|
|
|
|
- synchronized (pathToFileAttribs) {
|
|
|
|
- Set paths = pathToFileAttribs.keySet();
|
|
|
|
- if (paths == null || paths.isEmpty()) {
|
|
|
|
- return new Path[0];
|
|
|
|
- }
|
|
|
|
- Iterator iter = paths.iterator();
|
|
|
|
- while (iter.hasNext()) {
|
|
|
|
- String f = (String)iter.next();
|
|
|
|
- if (filter.accept(new Path(f))) {
|
|
|
|
- closedFilesList.add(f);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- String [] names =
|
|
|
|
- closedFilesList.toArray(new String[closedFilesList.size()]);
|
|
|
|
- Path [] results = new Path[names.length];
|
|
|
|
- for (int i = 0; i < names.length; i++) {
|
|
|
|
- results[i] = new Path(names[i]);
|
|
|
|
- }
|
|
|
|
- return results;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public int getNumFiles(PathFilter filter) {
|
|
|
|
- return getFiles(filter).length;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public long getFSSize() {
|
|
|
|
- return fsSize;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public float getPercentUsed() {
|
|
|
|
- if (fsSize > 0)
|
|
|
|
- return (float)totalUsed/fsSize;
|
|
|
|
- else return 0.1f;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @TODO: Fix for Java6?
|
|
|
|
- * As of Java5 it is safe to assume that if the file can fit
|
|
|
|
- * in-memory then its file-size is less than Integer.MAX_VALUE.
|
|
|
|
- */
|
|
|
|
- private boolean canFitInMemory(long size) {
|
|
|
|
- if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private String getPath(Path f) {
|
|
|
|
- return f.toUri().getPath();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static class FileAttributes {
|
|
|
|
- private byte[] data;
|
|
|
|
- private int size;
|
|
|
|
-
|
|
|
|
- public FileAttributes(int size) {
|
|
|
|
- this.size = size;
|
|
|
|
- this.data = new byte[size];
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private class InMemoryFileStatus extends FileStatus {
|
|
|
|
- InMemoryFileStatus(Path f, FileAttributes attr) throws IOException {
|
|
|
|
- super(attr.size, false, 1, getDefaultBlockSize(), 0, f);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public InMemoryFileSystem() {
|
|
|
|
- super(new RawInMemoryFileSystem());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public InMemoryFileSystem(URI uri, Configuration conf) {
|
|
|
|
- super(new RawInMemoryFileSystem(uri, conf));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Register a file with its size. This will also register a checksum for the
|
|
|
|
- * file that the user is trying to create. This is required since none of
|
|
|
|
- * the FileSystem APIs accept the size of the file as argument. But since it
|
|
|
|
- * is required for us to apriori know the size of the file we are going to
|
|
|
|
- * create, the user must call this method for each file he wants to create
|
|
|
|
- * and reserve memory for that file. We either succeed in reserving memory
|
|
|
|
- * for both the main file and the checksum file and return true, or return
|
|
|
|
- * false.
|
|
|
|
- */
|
|
|
|
- public boolean reserveSpaceWithCheckSum(Path f, long size) {
|
|
|
|
- RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
|
|
|
|
- synchronized(mfs) {
|
|
|
|
- boolean b = mfs.reserveSpace(f, size);
|
|
|
|
- if (b) {
|
|
|
|
- long checksumSize = getChecksumFileLength(f, size);
|
|
|
|
- b = mfs.reserveSpace(getChecksumFile(f), checksumSize);
|
|
|
|
- if (!b) {
|
|
|
|
- mfs.unreserveSpace(f);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return b;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public Path[] getFiles(PathFilter filter) {
|
|
|
|
- return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public int getNumFiles(PathFilter filter) {
|
|
|
|
- return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public long getFSSize() {
|
|
|
|
- return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public float getPercentUsed() {
|
|
|
|
- return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
|
|
|
|
- }
|
|
|
|
-}
|
|
|