|
@@ -0,0 +1,951 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
|
|
+package org.apache.hadoop.fs.viewfs;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
|
+import org.apache.hadoop.fs.LocatedFileStatus;
|
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.fs.RemoteIterator;
|
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
|
+import org.apache.hadoop.io.MultipleIOException;
|
|
|
|
+import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
|
+import org.apache.hadoop.net.NetworkTopology;
|
|
|
|
+import org.apache.hadoop.net.Node;
|
|
|
|
+import org.apache.hadoop.net.NodeBase;
|
|
|
|
+import org.apache.hadoop.net.ScriptBasedMapping;
|
|
|
|
+import org.apache.hadoop.util.Progressable;
|
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
+
|
|
|
|
+import java.io.FileNotFoundException;
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.io.OutputStream;
|
|
|
|
+import java.net.InetAddress;
|
|
|
|
+import java.net.URI;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.Arrays;
|
|
|
|
+import java.util.BitSet;
|
|
|
|
+import java.util.EnumSet;
|
|
|
|
+import java.util.Iterator;
|
|
|
|
+import java.util.List;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * Nfly is a multi filesystem mount point.
|
|
|
|
+ */
|
|
|
|
+@Private
|
|
|
|
+final class NflyFSystem extends FileSystem {
|
|
|
|
+ private static final Log LOG = LogFactory.getLog(NflyFSystem.class);
|
|
|
|
+ private static final String NFLY_TMP_PREFIX = "_nfly_tmp_";
|
|
|
|
+
|
|
|
|
+ enum NflyKey {
|
|
|
|
+ // minimum replication, if local filesystem is included +1 is recommended
|
|
|
|
+ minReplication,
|
|
|
|
+
|
|
|
|
+ // forces to check all the replicas and fetch the one with the most recent
|
|
|
|
+ // time stamp
|
|
|
|
+ //
|
|
|
|
+ readMostRecent,
|
|
|
|
+
|
|
|
|
+ // create missing replica from far to near, including local?
|
|
|
|
+ repairOnRead
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final int DEFAULT_MIN_REPLICATION = 2;
|
|
|
|
+ private static URI nflyURI = URI.create("nfly:///");
|
|
|
|
+
|
|
|
|
+ private final NflyNode[] nodes;
|
|
|
|
+ private final int minReplication;
|
|
|
|
+ private final EnumSet<NflyKey> nflyFlags;
|
|
|
|
+ private final Node myNode;
|
|
|
|
+ private final NetworkTopology topology;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * URI's authority is used as an approximation of the distance from the
|
|
|
|
+ * client. It's sufficient for DC but not accurate because worker nodes can be
|
|
|
|
+ * closer.
|
|
|
|
+ */
|
|
|
|
+ private static class NflyNode extends NodeBase {
|
|
|
|
+ private final ChRootedFileSystem fs;
|
|
|
|
+ NflyNode(String hostName, String rackName, URI uri,
|
|
|
|
+ Configuration conf) throws IOException {
|
|
|
|
+ this(hostName, rackName, new ChRootedFileSystem(uri, conf));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ NflyNode(String hostName, String rackName, ChRootedFileSystem fs) {
|
|
|
|
+ super(hostName, rackName);
|
|
|
|
+ this.fs = fs;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ChRootedFileSystem getFs() {
|
|
|
|
+ return fs;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean equals(Object o) {
|
|
|
|
+ // satisfy findbugs
|
|
|
|
+ return super.equals(o);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int hashCode() {
|
|
|
|
+ // satisfy findbugs
|
|
|
|
+ return super.hashCode();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final class MRNflyNode
|
|
|
|
+ extends NflyNode implements Comparable<MRNflyNode> {
|
|
|
|
+
|
|
|
|
+ private FileStatus status;
|
|
|
|
+
|
|
|
|
+ private MRNflyNode(NflyNode n) {
|
|
|
|
+ super(n.getName(), n.getNetworkLocation(), n.fs);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void updateFileStatus(Path f) throws IOException {
|
|
|
|
+ final FileStatus tmpStatus = getFs().getFileStatus(f);
|
|
|
|
+ status = tmpStatus == null
|
|
|
|
+ ? notFoundStatus(f)
|
|
|
|
+ : tmpStatus;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // TODO allow configurable error margin for FileSystems with different
|
|
|
|
+ // timestamp precisions
|
|
|
|
+ @Override
|
|
|
|
+ public int compareTo(MRNflyNode other) {
|
|
|
|
+ if (status == null) {
|
|
|
|
+ return other.status == null ? 0 : 1; // move non-null towards head
|
|
|
|
+ } else if (other.status == null) {
|
|
|
|
+ return -1; // move this towards head
|
|
|
|
+ } else {
|
|
|
|
+ final long mtime = status.getModificationTime();
|
|
|
|
+ final long their = other.status.getModificationTime();
|
|
|
|
+ return Long.compare(their, mtime); // move more recent towards head
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean equals(Object o) {
|
|
|
|
+ if (!(o instanceof MRNflyNode)) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ MRNflyNode other = (MRNflyNode) o;
|
|
|
|
+ return 0 == compareTo(other);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int hashCode() {
|
|
|
|
+ // satisfy findbugs
|
|
|
|
+ return super.hashCode();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private FileStatus nflyStatus() throws IOException {
|
|
|
|
+ return new NflyStatus(getFs(), status);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private FileStatus cloneStatus() throws IOException {
|
|
|
|
+ return new FileStatus(status.getLen(),
|
|
|
|
+ status.isDirectory(),
|
|
|
|
+ status.getReplication(),
|
|
|
|
+ status.getBlockSize(),
|
|
|
|
+ status.getModificationTime(),
|
|
|
|
+ status.getAccessTime(),
|
|
|
|
+ null, null, null,
|
|
|
|
+ status.isSymlink() ? status.getSymlink() : null,
|
|
|
|
+ status.getPath());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private MRNflyNode[] workSet() {
|
|
|
|
+ final MRNflyNode[] res = new MRNflyNode[nodes.length];
|
|
|
|
+ for (int i = 0; i < res.length; i++) {
|
|
|
|
+ res[i] = new MRNflyNode(nodes[i]);
|
|
|
|
+ }
|
|
|
|
+ return res;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Utility to replace null with DEFAULT_RACK.
|
|
|
|
+ *
|
|
|
|
+ * @param rackString rack value, can be null
|
|
|
|
+ * @return non-null rack string
|
|
|
|
+ */
|
|
|
|
+ private static String getRack(String rackString) {
|
|
|
|
+ return rackString == null ? NetworkTopology.DEFAULT_RACK : rackString;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Creates a new Nfly instance.
|
|
|
|
+ *
|
|
|
|
+ * @param uris the list of uris in the mount point
|
|
|
|
+ * @param conf configuration object
|
|
|
|
+ * @param minReplication minimum copies to commit a write op
|
|
|
|
+ * @param nflyFlags modes such readMostRecent
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
|
|
|
|
+ EnumSet<NflyKey> nflyFlags) throws IOException {
|
|
|
|
+ if (uris.length < minReplication) {
|
|
|
|
+ throw new IOException(minReplication + " < " + uris.length
|
|
|
|
+ + ": Minimum replication < #destinations");
|
|
|
|
+ }
|
|
|
|
+ setConf(conf);
|
|
|
|
+ final String localHostName = InetAddress.getLocalHost().getHostName();
|
|
|
|
+
|
|
|
|
+ // build a list for topology resolution
|
|
|
|
+ final List<String> hostStrings = new ArrayList<String>(uris.length + 1);
|
|
|
|
+ for (URI uri : uris) {
|
|
|
|
+ final String uriHost = uri.getHost();
|
|
|
|
+ // assume local file system or another closest filesystem if no authority
|
|
|
|
+ hostStrings.add(uriHost == null ? localHostName : uriHost);
|
|
|
|
+ }
|
|
|
|
+ // resolve the client node
|
|
|
|
+ hostStrings.add(localHostName);
|
|
|
|
+
|
|
|
|
+ final DNSToSwitchMapping tmpDns = ReflectionUtils.newInstance(conf.getClass(
|
|
|
|
+ CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
|
|
+ ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
|
|
|
|
+
|
|
|
|
+ // this is an ArrayList
|
|
|
|
+ final List<String> rackStrings = tmpDns.resolve(hostStrings);
|
|
|
|
+ nodes = new NflyNode[uris.length];
|
|
|
|
+ final Iterator<String> rackIter = rackStrings.iterator();
|
|
|
|
+ for (int i = 0; i < nodes.length; i++) {
|
|
|
|
+ nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
|
|
|
|
+ conf);
|
|
|
|
+ }
|
|
|
|
+ // sort all the uri's by distance from myNode, the local file system will
|
|
|
|
+ // automatically be the the first one.
|
|
|
|
+ //
|
|
|
|
+ myNode = new NodeBase(localHostName, getRack(rackIter.next()));
|
|
|
|
+ topology = NetworkTopology.getInstance(conf);
|
|
|
|
+ topology.sortByDistance(myNode, nodes, nodes.length);
|
|
|
|
+
|
|
|
|
+ this.minReplication = minReplication;
|
|
|
|
+ this.nflyFlags = nflyFlags;
|
|
|
|
+ statistics = getStatistics(nflyURI.getScheme(), getClass());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Transactional output stream. When creating path /dir/file
|
|
|
|
+ * 1) create invisible /real/dir_i/_nfly_tmp_file
|
|
|
|
+ * 2) when more than min replication was written, write is committed by
|
|
|
|
+ * renaming all successfully written files to /real/dir_i/file
|
|
|
|
+ */
|
|
|
|
+ private final class NflyOutputStream extends OutputStream {
|
|
|
|
+ // actual path
|
|
|
|
+ private final Path nflyPath;
|
|
|
|
+ // tmp path before commit
|
|
|
|
+ private final Path tmpPath;
|
|
|
|
+ // broadcast set
|
|
|
|
+ private final FSDataOutputStream[] outputStreams;
|
|
|
|
+ // status set: 1 working, 0 problem
|
|
|
|
+ private final BitSet opSet;
|
|
|
|
+ private final boolean useOverwrite;
|
|
|
|
+
|
|
|
|
+ private NflyOutputStream(Path f, FsPermission permission, boolean overwrite,
|
|
|
|
+ int bufferSize, short replication, long blockSize,
|
|
|
|
+ Progressable progress) throws IOException {
|
|
|
|
+ nflyPath = f;
|
|
|
|
+ tmpPath = getNflyTmpPath(f);
|
|
|
|
+ outputStreams = new FSDataOutputStream[nodes.length];
|
|
|
|
+ for (int i = 0; i < outputStreams.length; i++) {
|
|
|
|
+ outputStreams[i] = nodes[i].fs.create(tmpPath, permission, true,
|
|
|
|
+ bufferSize, replication, blockSize, progress);
|
|
|
|
+ }
|
|
|
|
+ opSet = new BitSet(outputStreams.length);
|
|
|
|
+ opSet.set(0, outputStreams.length);
|
|
|
|
+ useOverwrite = false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ // TODO consider how to clean up and throw an exception early when the clear
|
|
|
|
+ // bits under min replication
|
|
|
|
+ //
|
|
|
|
+
|
|
|
|
+ private void mayThrow(List<IOException> ioExceptions) throws IOException {
|
|
|
|
+ final IOException ioe = MultipleIOException
|
|
|
|
+ .createIOException(ioExceptions);
|
|
|
|
+ if (opSet.cardinality() < minReplication) {
|
|
|
|
+ throw ioe;
|
|
|
|
+ } else {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Exceptions occurred: " + ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void write(int d) throws IOException {
|
|
|
|
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
|
|
|
|
+ for (int i = opSet.nextSetBit(0);
|
|
|
|
+ i >=0;
|
|
|
|
+ i = opSet.nextSetBit(i + 1)) {
|
|
|
|
+ try {
|
|
|
|
+ outputStreams[i].write(d);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ osException(i, "write", t, ioExceptions);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ mayThrow(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void osException(int i, String op, Throwable t,
|
|
|
|
+ List<IOException> ioExceptions) {
|
|
|
|
+ opSet.clear(i);
|
|
|
|
+ processThrowable(nodes[i], op, t, ioExceptions, tmpPath, nflyPath);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void write(byte[] bytes, int offset, int len) throws IOException {
|
|
|
|
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
|
|
|
|
+ for (int i = opSet.nextSetBit(0);
|
|
|
|
+ i >= 0;
|
|
|
|
+ i = opSet.nextSetBit(i + 1)) {
|
|
|
|
+ try {
|
|
|
|
+ outputStreams[i].write(bytes, offset, len);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ osException(i, "write", t, ioExceptions);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ mayThrow(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void flush() throws IOException {
|
|
|
|
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
|
|
|
|
+ for (int i = opSet.nextSetBit(0);
|
|
|
|
+ i >= 0;
|
|
|
|
+ i = opSet.nextSetBit(i + 1)) {
|
|
|
|
+ try {
|
|
|
|
+ outputStreams[i].flush();
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ osException(i, "flush", t, ioExceptions);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ mayThrow(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void close() throws IOException {
|
|
|
|
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
|
|
|
|
+ for (int i = opSet.nextSetBit(0);
|
|
|
|
+ i >= 0;
|
|
|
|
+ i = opSet.nextSetBit(i + 1)) {
|
|
|
|
+ try {
|
|
|
|
+ outputStreams[i].close();
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ osException(i, "close", t, ioExceptions);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (opSet.cardinality() < minReplication) {
|
|
|
|
+ cleanupAllTmpFiles();
|
|
|
|
+ throw new IOException("Failed to sufficiently replicate: min="
|
|
|
|
+ + minReplication + " actual=" + opSet.cardinality());
|
|
|
|
+ } else {
|
|
|
|
+ commit();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void cleanupAllTmpFiles() throws IOException {
|
|
|
|
+ for (int i = 0; i < outputStreams.length; i++) {
|
|
|
|
+ try {
|
|
|
|
+ nodes[i].fs.delete(tmpPath);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ processThrowable(nodes[i], "delete", t, null, tmpPath);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void commit() throws IOException {
|
|
|
|
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
|
|
|
|
+ for (int i = opSet.nextSetBit(0);
|
|
|
|
+ i >= 0;
|
|
|
|
+ i = opSet.nextSetBit(i + 1)) {
|
|
|
|
+ final NflyNode nflyNode = nodes[i];
|
|
|
|
+ try {
|
|
|
|
+ if (useOverwrite) {
|
|
|
|
+ nflyNode.fs.delete(nflyPath);
|
|
|
|
+ }
|
|
|
|
+ nflyNode.fs.rename(tmpPath, nflyPath);
|
|
|
|
+
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ osException(i, "commit", t, ioExceptions);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (opSet.cardinality() < minReplication) {
|
|
|
|
+ // cleanup should be done outside. If rename failed, it's unlikely that
|
|
|
|
+ // delete will work either. It's the same kind of metadata-only op
|
|
|
|
+ //
|
|
|
|
+ throw MultipleIOException.createIOException(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // best effort to have a consistent timestamp
|
|
|
|
+ final long commitTime = System.currentTimeMillis();
|
|
|
|
+ for (int i = opSet.nextSetBit(0);
|
|
|
|
+ i >= 0;
|
|
|
|
+ i = opSet.nextSetBit(i + 1)) {
|
|
|
|
+ try {
|
|
|
|
+ nodes[i].fs.setTimes(nflyPath, commitTime, commitTime);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ LOG.info("Failed to set timestamp: " + nodes[i] + " " + nflyPath);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Path getNflyTmpPath(Path f) {
|
|
|
|
+ return new Path(f.getParent(), NFLY_TMP_PREFIX + f.getName());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * // TODO
|
|
|
|
+ * Some file status implementations have expensive deserialization or metadata
|
|
|
|
+ * retrieval. This probably does not go beyond RawLocalFileSystem. Wrapping
|
|
|
|
+ * the the real file status to preserve this behavior. Otherwise, calling
|
|
|
|
+ * realStatus getters in constructor defeats this design.
|
|
|
|
+ */
|
|
|
|
+ static final class NflyStatus extends FileStatus {
|
|
|
|
+ private static final long serialVersionUID = 0x21f276d8;
|
|
|
|
+
|
|
|
|
+ private final FileStatus realStatus;
|
|
|
|
+ private final String strippedRoot;
|
|
|
|
+
|
|
|
|
+ private NflyStatus(ChRootedFileSystem realFs, FileStatus realStatus)
|
|
|
|
+ throws IOException {
|
|
|
|
+ this.realStatus = realStatus;
|
|
|
|
+ this.strippedRoot = realFs.stripOutRoot(realStatus.getPath());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String stripRoot() throws IOException {
|
|
|
|
+ return strippedRoot;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public long getLen() {
|
|
|
|
+ return realStatus.getLen();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean isFile() {
|
|
|
|
+ return realStatus.isFile();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean isDirectory() {
|
|
|
|
+ return realStatus.isDirectory();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean isSymlink() {
|
|
|
|
+ return realStatus.isSymlink();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public long getBlockSize() {
|
|
|
|
+ return realStatus.getBlockSize();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public short getReplication() {
|
|
|
|
+ return realStatus.getReplication();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public long getModificationTime() {
|
|
|
|
+ return realStatus.getModificationTime();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public long getAccessTime() {
|
|
|
|
+ return realStatus.getAccessTime();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public FsPermission getPermission() {
|
|
|
|
+ return realStatus.getPermission();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String getOwner() {
|
|
|
|
+ return realStatus.getOwner();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String getGroup() {
|
|
|
|
+ return realStatus.getGroup();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Path getPath() {
|
|
|
|
+ return realStatus.getPath();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setPath(Path p) {
|
|
|
|
+ realStatus.setPath(p);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Path getSymlink() throws IOException {
|
|
|
|
+ return realStatus.getSymlink();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setSymlink(Path p) {
|
|
|
|
+ realStatus.setSymlink(p);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean equals(Object o) {
|
|
|
|
+ return realStatus.equals(o);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int hashCode() {
|
|
|
|
+ return realStatus.hashCode();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ return realStatus.toString();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public URI getUri() {
|
|
|
|
+ return nflyURI;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Category: READ.
|
|
|
|
+ *
|
|
|
|
+ * @param f the file name to open
|
|
|
|
+ * @param bufferSize the size of the buffer to be used.
|
|
|
|
+ * @return input stream according to nfly flags (closest, most recent)
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws FileNotFoundException iff all destinations generate this exception
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
|
|
|
+ // TODO proxy stream for reads
|
|
|
|
+ final List<IOException> ioExceptions =
|
|
|
|
+ new ArrayList<IOException>(nodes.length);
|
|
|
|
+ int numNotFounds = 0;
|
|
|
|
+ final MRNflyNode[] mrNodes = workSet();
|
|
|
|
+
|
|
|
|
+ // naively iterate until one can be opened
|
|
|
|
+ //
|
|
|
|
+ for (final MRNflyNode nflyNode : mrNodes) {
|
|
|
|
+ try {
|
|
|
|
+ if (nflyFlags.contains(NflyKey.repairOnRead)
|
|
|
|
+ || nflyFlags.contains(NflyKey.readMostRecent)) {
|
|
|
|
+ // calling file status to avoid pulling bytes prematurely
|
|
|
|
+ nflyNode.updateFileStatus(f);
|
|
|
|
+ } else {
|
|
|
|
+ return nflyNode.getFs().open(f, bufferSize);
|
|
|
|
+ }
|
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
|
+ nflyNode.status = notFoundStatus(f);
|
|
|
|
+ numNotFounds++;
|
|
|
|
+ processThrowable(nflyNode, "open", fnfe, ioExceptions, f);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ processThrowable(nflyNode, "open", t, ioExceptions, f);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (nflyFlags.contains(NflyKey.readMostRecent)) {
|
|
|
|
+ // sort from most recent to least recent
|
|
|
|
+ Arrays.sort(mrNodes);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ final FSDataInputStream fsdisAfterRepair = repairAndOpen(mrNodes, f,
|
|
|
|
+ bufferSize);
|
|
|
|
+
|
|
|
|
+ if (fsdisAfterRepair != null) {
|
|
|
|
+ return fsdisAfterRepair;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
|
|
|
|
+ throw MultipleIOException.createIOException(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static FileStatus notFoundStatus(Path f) {
|
|
|
|
+ return new FileStatus(-1, false, 0, 0, 0, f);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Iterate all available nodes in the proximity order to attempt repair of all
|
|
|
|
+ * FileNotFound nodes.
|
|
|
|
+ *
|
|
|
|
+ * @param mrNodes work set copy of nodes
|
|
|
|
+ * @param f path to repair and open
|
|
|
|
+ * @param bufferSize buffer size for read RPC
|
|
|
|
+ * @return the closest/most recent replica stream AFTER repair
|
|
|
|
+ */
|
|
|
|
+ private FSDataInputStream repairAndOpen(MRNflyNode[] mrNodes, Path f,
|
|
|
|
+ int bufferSize) {
|
|
|
|
+ long maxMtime = 0L;
|
|
|
|
+ for (final MRNflyNode srcNode : mrNodes) {
|
|
|
|
+ if (srcNode.status == null // not available
|
|
|
|
+ || srcNode.status.getLen() < 0L) { // not found
|
|
|
|
+ continue; // not available
|
|
|
|
+ }
|
|
|
|
+ if (srcNode.status.getModificationTime() > maxMtime) {
|
|
|
|
+ maxMtime = srcNode.status.getModificationTime();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // attempt to repair all notFound nodes with srcNode
|
|
|
|
+ //
|
|
|
|
+ for (final MRNflyNode dstNode : mrNodes) {
|
|
|
|
+ if (dstNode.status == null // not available
|
|
|
|
+ || srcNode.compareTo(dstNode) == 0) { // same mtime
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ // status is absolute from the underlying mount, making it chrooted
|
|
|
|
+ //
|
|
|
|
+ final FileStatus srcStatus = srcNode.cloneStatus();
|
|
|
|
+ srcStatus.setPath(f);
|
|
|
|
+ final Path tmpPath = getNflyTmpPath(f);
|
|
|
|
+ FileUtil.copy(srcNode.getFs(), srcStatus, dstNode.getFs(), tmpPath,
|
|
|
|
+ false, // don't delete
|
|
|
|
+ true, // overwrite
|
|
|
|
+ getConf());
|
|
|
|
+ dstNode.getFs().delete(f, false);
|
|
|
|
+ if (dstNode.getFs().rename(tmpPath, f)) {
|
|
|
|
+ try {
|
|
|
|
+ dstNode.getFs().setTimes(f, srcNode.status.getModificationTime(),
|
|
|
|
+ srcNode.status.getAccessTime());
|
|
|
|
+ } finally {
|
|
|
|
+ // save getFileStatus rpc
|
|
|
|
+ srcStatus.setPath(dstNode.getFs().makeQualified(f));
|
|
|
|
+ dstNode.status = srcStatus;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ // can blame the source by statusSet.clear(ai), however, it would
|
|
|
|
+ // cost an extra RPC, so just rely on the loop below that will attempt
|
|
|
|
+ // an open anyhow
|
|
|
|
+ //
|
|
|
|
+ LOG.info(f + " " + srcNode + "->" + dstNode + ": Failed to repair",
|
|
|
|
+ ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Since Java7, QuickSort is used instead of MergeSort.
|
|
|
|
+ // QuickSort may not be stable and thus the equal most recent nodes, may no
|
|
|
|
+ // longer appear in the NetworkTopology order.
|
|
|
|
+ //
|
|
|
|
+ if (maxMtime > 0) {
|
|
|
|
+ final List<MRNflyNode> mrList = new ArrayList<MRNflyNode>();
|
|
|
|
+ for (final MRNflyNode openNode : mrNodes) {
|
|
|
|
+ if (openNode.status != null && openNode.status.getLen() >= 0L) {
|
|
|
|
+ if (openNode.status.getModificationTime() == maxMtime) {
|
|
|
|
+ mrList.add(openNode);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // assert mrList.size > 0
|
|
|
|
+ final MRNflyNode[] readNodes = mrList.toArray(new MRNflyNode[0]);
|
|
|
|
+ topology.sortByDistance(myNode, readNodes, readNodes.length);
|
|
|
|
+ for (final MRNflyNode rNode : readNodes) {
|
|
|
|
+ try {
|
|
|
|
+ return rNode.getFs().open(f, bufferSize);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.info(f + ": Failed to open at " + rNode.getFs().getUri());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void mayThrowFileNotFound(List<IOException> ioExceptions,
|
|
|
|
+ int numNotFounds) throws FileNotFoundException {
|
|
|
|
+ if (numNotFounds == nodes.length) {
|
|
|
|
+ throw (FileNotFoundException)ioExceptions.get(nodes.length - 1);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // WRITE
|
|
|
|
+ @Override
|
|
|
|
+ public FSDataOutputStream create(Path f, FsPermission permission,
|
|
|
|
+ boolean overwrite, int bufferSize, short replication, long blockSize,
|
|
|
|
+ Progressable progress) throws IOException {
|
|
|
|
+ return new FSDataOutputStream(new NflyOutputStream(f, permission, overwrite,
|
|
|
|
+ bufferSize, replication, blockSize, progress), statistics);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // WRITE
|
|
|
|
+ @Override
|
|
|
|
+ public FSDataOutputStream append(Path f, int bufferSize,
|
|
|
|
+ Progressable progress) throws IOException {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // WRITE
|
|
|
|
+ @Override
|
|
|
|
+ public boolean rename(Path src, Path dst) throws IOException {
|
|
|
|
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
|
|
|
|
+ int numNotFounds = 0;
|
|
|
|
+ boolean succ = true;
|
|
|
|
+ for (final NflyNode nflyNode : nodes) {
|
|
|
|
+ try {
|
|
|
|
+ succ &= nflyNode.fs.rename(src, dst);
|
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
|
+ numNotFounds++;
|
|
|
|
+ processThrowable(nflyNode, "rename", fnfe, ioExceptions, src, dst);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ processThrowable(nflyNode, "rename", t, ioExceptions, src, dst);
|
|
|
|
+ succ = false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
|
|
|
|
+
|
|
|
|
+ // if all destinations threw exceptions throw, otherwise return
|
|
|
|
+ //
|
|
|
|
+ if (ioExceptions.size() == nodes.length) {
|
|
|
|
+ throw MultipleIOException.createIOException(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return succ;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // WRITE
|
|
|
|
+ @Override
|
|
|
|
+ public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
|
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
|
|
|
|
+ int numNotFounds = 0;
|
|
|
|
+ boolean succ = true;
|
|
|
|
+ for (final NflyNode nflyNode : nodes) {
|
|
|
|
+ try {
|
|
|
|
+ succ &= nflyNode.fs.delete(f);
|
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
|
+ numNotFounds++;
|
|
|
|
+ processThrowable(nflyNode, "delete", fnfe, ioExceptions, f);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ processThrowable(nflyNode, "delete", t, ioExceptions, f);
|
|
|
|
+ succ = false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
|
|
|
|
+
|
|
|
|
+ // if all destinations threw exceptions throw, otherwise return
|
|
|
|
+ //
|
|
|
|
+ if (ioExceptions.size() == nodes.length) {
|
|
|
|
+ throw MultipleIOException.createIOException(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return succ;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Returns the closest non-failing destination's result.
|
|
|
|
+ *
|
|
|
|
+ * @param f given path
|
|
|
|
+ * @return array of file statuses according to nfly modes
|
|
|
|
+ * @throws FileNotFoundException
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
|
|
|
|
+ IOException {
|
|
|
|
+ final List<IOException> ioExceptions =
|
|
|
|
+ new ArrayList<IOException>(nodes.length);
|
|
|
|
+
|
|
|
|
+ final MRNflyNode[] mrNodes = workSet();
|
|
|
|
+ if (nflyFlags.contains(NflyKey.readMostRecent)) {
|
|
|
|
+ int numNotFounds = 0;
|
|
|
|
+ for (final MRNflyNode nflyNode : mrNodes) {
|
|
|
|
+ try {
|
|
|
|
+ nflyNode.updateFileStatus(f);
|
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
|
+ numNotFounds++;
|
|
|
|
+ processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
|
|
|
|
+ Arrays.sort(mrNodes);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int numNotFounds = 0;
|
|
|
|
+ for (final MRNflyNode nflyNode : mrNodes) {
|
|
|
|
+ try {
|
|
|
|
+ final FileStatus[] realStats = nflyNode.getFs().listStatus(f);
|
|
|
|
+ final FileStatus[] nflyStats = new FileStatus[realStats.length];
|
|
|
|
+ for (int i = 0; i < realStats.length; i++) {
|
|
|
|
+ nflyStats[i] = new NflyStatus(nflyNode.getFs(), realStats[i]);
|
|
|
|
+ }
|
|
|
|
+ return nflyStats;
|
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
|
+ numNotFounds++;
|
|
|
|
+ processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
|
|
|
|
+ throw MultipleIOException.createIOException(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
|
|
|
|
+ throws FileNotFoundException, IOException {
|
|
|
|
+ // TODO important for splits
|
|
|
|
+ return super.listLocatedStatus(f);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setWorkingDirectory(Path newDir) {
|
|
|
|
+ for (final NflyNode nflyNode : nodes) {
|
|
|
|
+ nflyNode.fs.setWorkingDirectory(newDir);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Path getWorkingDirectory() {
|
|
|
|
+ return nodes[0].fs.getWorkingDirectory(); // 0 is as good as any
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
|
|
|
+ boolean succ = true;
|
|
|
|
+ for (final NflyNode nflyNode : nodes) {
|
|
|
|
+ succ &= nflyNode.fs.mkdirs(f, permission);
|
|
|
|
+ }
|
|
|
|
+ return succ;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public FileStatus getFileStatus(Path f) throws IOException {
|
|
|
|
+ // TODO proxy stream for reads
|
|
|
|
+ final List<IOException> ioExceptions =
|
|
|
|
+ new ArrayList<IOException>(nodes.length);
|
|
|
|
+ int numNotFounds = 0;
|
|
|
|
+ final MRNflyNode[] mrNodes = workSet();
|
|
|
|
+
|
|
|
|
+ long maxMtime = Long.MIN_VALUE;
|
|
|
|
+ int maxMtimeIdx = Integer.MIN_VALUE;
|
|
|
|
+
|
|
|
|
+ // naively iterate until one can be returned
|
|
|
|
+ //
|
|
|
|
+ for (int i = 0; i < mrNodes.length; i++) {
|
|
|
|
+ MRNflyNode nflyNode = mrNodes[i];
|
|
|
|
+ try {
|
|
|
|
+ nflyNode.updateFileStatus(f);
|
|
|
|
+ if (nflyFlags.contains(NflyKey.readMostRecent)) {
|
|
|
|
+ final long nflyTime = nflyNode.status.getModificationTime();
|
|
|
|
+ if (nflyTime > maxMtime) {
|
|
|
|
+ maxMtime = nflyTime;
|
|
|
|
+ maxMtimeIdx = i;
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ return nflyNode.nflyStatus();
|
|
|
|
+ }
|
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
|
+ numNotFounds++;
|
|
|
|
+ processThrowable(nflyNode, "getFileStatus", fnfe, ioExceptions, f);
|
|
|
|
+ } catch (Throwable t) {
|
|
|
|
+ processThrowable(nflyNode, "getFileStatus", t, ioExceptions, f);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (maxMtimeIdx >= 0) {
|
|
|
|
+ return mrNodes[maxMtimeIdx].nflyStatus();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
|
|
|
|
+ throw MultipleIOException.createIOException(ioExceptions);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static void processThrowable(NflyNode nflyNode, String op,
|
|
|
|
+ Throwable t, List<IOException> ioExceptions,
|
|
|
|
+ Path... f) {
|
|
|
|
+ final String errMsg = Arrays.toString(f)
|
|
|
|
+ + ": failed to " + op + " " + nflyNode.fs.getUri();
|
|
|
|
+ final IOException ioex;
|
|
|
|
+ if (t instanceof FileNotFoundException) {
|
|
|
|
+ ioex = new FileNotFoundException(errMsg);
|
|
|
|
+ ioex.initCause(t);
|
|
|
|
+ } else {
|
|
|
|
+ ioex = new IOException(errMsg, t);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (ioExceptions != null) {
|
|
|
|
+ ioExceptions.add(ioex);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Initializes an nfly mountpoint in viewfs.
|
|
|
|
+ *
|
|
|
|
+ * @param uris destinations to replicate writes to
|
|
|
|
+ * @param conf file system configuration
|
|
|
|
+ * @param settings comma-separated list of k=v pairs.
|
|
|
|
+ * @return an Nfly filesystem
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ static FileSystem createFileSystem(URI[] uris, Configuration conf,
|
|
|
|
+ String settings) throws IOException {
|
|
|
|
+ // assert settings != null
|
|
|
|
+ int minRepl = DEFAULT_MIN_REPLICATION;
|
|
|
|
+ EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
|
|
|
|
+ final String[] kvPairs = StringUtils.split(settings);
|
|
|
|
+ for (String kv : kvPairs) {
|
|
|
|
+ final String[] kvPair = StringUtils.split(kv, '=');
|
|
|
|
+ if (kvPair.length != 2) {
|
|
|
|
+ throw new IllegalArgumentException(kv);
|
|
|
|
+ }
|
|
|
|
+ NflyKey nflyKey = NflyKey.valueOf(kvPair[0]);
|
|
|
|
+ switch (nflyKey) {
|
|
|
|
+ case minReplication:
|
|
|
|
+ minRepl = Integer.parseInt(kvPair[1]);
|
|
|
|
+ break;
|
|
|
|
+ case repairOnRead:
|
|
|
|
+ case readMostRecent:
|
|
|
|
+ if (Boolean.valueOf(kvPair[1])) {
|
|
|
|
+ nflyFlags.add(nflyKey);
|
|
|
|
+ }
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ throw new IllegalArgumentException(nflyKey + ": Infeasible");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return new NflyFSystem(uris, conf, minRepl, nflyFlags);
|
|
|
|
+ }
|
|
|
|
+}
|