瀏覽代碼

HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS, relying on FileSystem (taton)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@648123 13f79535-47bb-0310-9956-ffa450edef68
Christophe Taton 17 年之前
父節點
當前提交
ce7d33bf17

+ 3 - 0
CHANGES.txt

@@ -7,6 +7,9 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
+    relying on FileSystem (taton)
+
     HADOOP-2585. Name-node imports namespace data from a recent checkpoint
     accessible via a NFS mount. (shv)
 

+ 61 - 0
src/java/org/apache/hadoop/fs/FsUrlConnection.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Representation of a URL connection to open InputStreams.
+ */
+class FsUrlConnection extends URLConnection {
+
+  private Configuration conf;
+
+  private InputStream is;
+
+  FsUrlConnection(Configuration conf, URL url) {
+    super(url);
+    this.conf = conf;
+  }
+
+  @Override
+  public void connect() throws IOException {
+    try {
+      FileSystem fs = FileSystem.get(url.toURI(), conf);
+      is = fs.open(new Path(url.getPath()));
+    } catch (URISyntaxException e) {
+      throw new IOException(e.toString());
+    }
+  }
+
+  /* @inheritDoc */
+  @Override
+  public InputStream getInputStream() throws IOException {
+    if (is == null) {
+      connect();
+    }
+    return is;
+  }
+
+}

+ 47 - 0
src/java/org/apache/hadoop/fs/FsUrlStreamHandler.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLStreamHandler;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * URLStream handler relying on FileSystem and on a given Configuration to
+ * handle URL protocols.
+ */
+class FsUrlStreamHandler extends URLStreamHandler {
+
+  private Configuration conf;
+
+  FsUrlStreamHandler(Configuration conf) {
+    this.conf = conf;
+  }
+
+  FsUrlStreamHandler() {
+    this.conf = new Configuration();
+  }
+
+  @Override
+  protected FsUrlConnection openConnection(URL url) throws IOException {
+    return new FsUrlConnection(conf, url);
+  }
+
+}

+ 78 - 0
src/java/org/apache/hadoop/fs/FsUrlStreamHandlerFactory.java

@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.net.URLStreamHandlerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Factory for URL stream handlers.
+ * 
+ * There is only one handler whose job is to create UrlConnections. A
+ * FsUrlConnection relies on FileSystem to choose the appropriate FS
+ * implementation.
+ * 
+ * Before returning our handler, we make sure that FileSystem knows an
+ * implementation for the requested scheme/protocol.
+ */
+public class FsUrlStreamHandlerFactory implements
+    URLStreamHandlerFactory {
+
+  // The configuration holds supported FS implementation class names.
+  private Configuration conf;
+
+  // This map stores whether a protocol is know or not by FileSystem
+  private Map<String, Boolean> protocols = new HashMap<String, Boolean>();
+
+  // The URL Stream handler
+  private java.net.URLStreamHandler handler;
+
+  public FsUrlStreamHandlerFactory() {
+    this.conf = new Configuration();
+    // force the resolution of the configuration files
+    // this is required if we want the factory to be able to handle
+    // file:// URLs
+    this.conf.getClass("fs.file.impl", null);
+    this.handler = new FsUrlStreamHandler(this.conf);
+  }
+
+  public FsUrlStreamHandlerFactory(Configuration conf) {
+    this.conf = new Configuration(conf);
+    // force the resolution of the configuration files
+    this.conf.getClass("fs.file.impl", null);
+    this.handler = new FsUrlStreamHandler(this.conf);
+  }
+
+  public java.net.URLStreamHandler createURLStreamHandler(String protocol) {
+    if (!protocols.containsKey(protocol)) {
+      boolean known =
+          (conf.getClass("fs." + protocol + ".impl", null) != null);
+      protocols.put(protocol, known);
+    }
+    if (protocols.get(protocol)) {
+      return handler;
+    } else {
+      // FileSystem does not know the protocol, let the VM handle this
+      return null;
+    }
+  }
+
+}

+ 155 - 0
src/test/org/apache/hadoop/fs/TestUrlStreamHandler.java

@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Test of the URL stream handler factory.
+ */
+public class TestUrlStreamHandler extends TestCase {
+
+  static {
+    // Setup our own factory
+    FsUrlStreamHandlerFactory factory =
+        new org.apache.hadoop.fs.FsUrlStreamHandlerFactory();
+    java.net.URL.setURLStreamHandlerFactory(factory);
+  }
+
+  /**
+   * Test opening and reading from an InputStream through a hdfs:// URL.
+   * <p>
+   * First generate a file with some content through the FileSystem API, then
+   * try to open and read the file through the URL stream API.
+   * 
+   * @throws IOException
+   */
+  public void testDfsUrls() throws IOException {
+
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    FileSystem fs = cluster.getFileSystem();
+
+    Path filePath = new Path("/thefile");
+
+    try {
+      byte[] fileContent = new byte[1024];
+      for (int i = 0; i < fileContent.length; ++i)
+        fileContent[i] = (byte) i;
+
+      // First create the file through the FileSystem API
+      OutputStream os = fs.create(filePath);
+      os.write(fileContent);
+      os.close();
+
+      // Second, open and read the file content through the URL API
+      URI uri = fs.getUri();
+      URL fileURL =
+          new URL(uri.getScheme(), uri.getHost(), uri.getPort(), filePath
+              .toString());
+
+      InputStream is = fileURL.openStream();
+      assertNotNull(is);
+
+      byte[] bytes = new byte[4096];
+      assertEquals(1024, is.read(bytes));
+      is.close();
+
+      for (int i = 0; i < fileContent.length; ++i)
+        assertEquals(fileContent[i], bytes[i]);
+
+      // Cleanup: delete the file
+      fs.delete(filePath, false);
+
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+
+  }
+
+  /**
+   * Test opening and reading from an InputStream through a file:// URL.
+   * 
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  public void testFileUrls() throws IOException, URISyntaxException {
+
+    Configuration conf = new Configuration();
+
+    // Locate the test temporary directory.
+    File tmpDir = new File(conf.get("hadoop.tmp.dir"));
+    if (!tmpDir.exists()) {
+      if (!tmpDir.mkdirs())
+        throw new IOException("Cannot create temporary directory: " + tmpDir);
+    }
+
+    String uriStr =
+        String.format("file://%s/thefile", tmpDir.getAbsolutePath());
+    URI uri = new URI(uriStr);
+
+    FileSystem fs = FileSystem.get(uri, conf);
+
+    try {
+      byte[] fileContent = new byte[1024];
+      for (int i = 0; i < fileContent.length; ++i)
+        fileContent[i] = (byte) i;
+
+      // First create the file through the FileSystem API
+      OutputStream os = fs.create(new Path(uri.getPath()));
+      os.write(fileContent);
+      os.close();
+
+      // Second, open and read the file content through the URL API.
+      URL fileURL = uri.toURL();
+
+      InputStream is = fileURL.openStream();
+      assertNotNull(is);
+
+      byte[] bytes = new byte[4096];
+      assertEquals(1024, is.read(bytes));
+      is.close();
+
+      for (int i = 0; i < fileContent.length; ++i)
+        assertEquals(fileContent[i], bytes[i]);
+
+      // Cleanup: delete the file
+      fs.delete(new Path(uri.getPath()), false);
+
+    } finally {
+      fs.close();
+    }
+
+  }
+
+}