|
@@ -0,0 +1,616 @@
|
|
|
+package org.apache.hadoop.thriftfs;
|
|
|
+
|
|
|
+import com.facebook.thrift.TException;
|
|
|
+import com.facebook.thrift.TApplicationException;
|
|
|
+import com.facebook.thrift.protocol.TBinaryProtocol;
|
|
|
+import com.facebook.thrift.protocol.TProtocol;
|
|
|
+import com.facebook.thrift.server.TServer;
|
|
|
+import com.facebook.thrift.server.TThreadPoolServer;
|
|
|
+import com.facebook.thrift.transport.TServerSocket;
|
|
|
+import com.facebook.thrift.transport.TServerTransport;
|
|
|
+import com.facebook.thrift.transport.TTransportFactory;
|
|
|
+
|
|
|
+// Include Generated code
|
|
|
+import org.apache.hadoop.thriftfs.api.*;
|
|
|
+import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
|
|
|
+
|
|
|
+import java.io.*;
|
|
|
+import java.util.*;
|
|
|
+import java.net.*;
|
|
|
+
|
|
|
+import org.apache.hadoop.fs.*;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+
|
|
|
+/**
|
|
|
+ * ThriftHadoopFileSystem
|
|
|
+ * A thrift wrapper around the Hadoop File System
|
|
|
+ */
|
|
|
+public class HadoopThriftServer extends ThriftHadoopFileSystem {
|
|
|
+
|
|
|
+ static int serverPort = 0; // default port
|
|
|
+ TServer server = null;
|
|
|
+
|
|
|
+ public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
|
|
|
+ {
|
|
|
+
|
|
|
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");
|
|
|
+
|
|
|
+ // HDFS glue
|
|
|
+ Configuration conf;
|
|
|
+ FileSystem fs;
|
|
|
+
|
|
|
+ // stucture that maps each Thrift object into an hadoop object
|
|
|
+ private long nextId = new Random().nextLong();
|
|
|
+ private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
|
|
|
+ private Daemon inactivityThread = null;
|
|
|
+
|
|
|
+ // Detect inactive session
|
|
|
+ private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
|
|
|
+ private static volatile long inactivityRecheckInterval = 60 * 1000;
|
|
|
+ private static volatile boolean fsRunning = true;
|
|
|
+ private static long now;
|
|
|
+
|
|
|
+ // allow outsider to change the hadoopthrift path
|
|
|
+ public void setOption(String key, String val) {
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Current system time.
|
|
|
+ * @return current time in msec.
|
|
|
+ */
|
|
|
+ static long now() {
|
|
|
+ return System.currentTimeMillis();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * getVersion
|
|
|
+ *
|
|
|
+ * @return current version of the interface.
|
|
|
+ */
|
|
|
+ public String getVersion() {
|
|
|
+ return "0.1";
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * shutdown
|
|
|
+ *
|
|
|
+ * cleanly closes everything and exit.
|
|
|
+ */
|
|
|
+ public void shutdown(int status) {
|
|
|
+ LOG.info("HadoopThriftServer shutting down.");
|
|
|
+ try {
|
|
|
+ fs.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Unable to close file system");
|
|
|
+ }
|
|
|
+ Runtime.getRuntime().exit(status);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Periodically checks to see if there is inactivity
|
|
|
+ */
|
|
|
+ class InactivityMonitor implements Runnable {
|
|
|
+ public void run() {
|
|
|
+ while (fsRunning) {
|
|
|
+ try {
|
|
|
+ if (now() > now + inactivityPeriod) {
|
|
|
+ LOG.warn("HadoopThriftServer Inactivity period of " +
|
|
|
+ inactivityPeriod + " expired... Stopping Server.");
|
|
|
+ shutdown(-1);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error(StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(inactivityRecheckInterval);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * HadoopThriftServer
|
|
|
+ *
|
|
|
+ * Constructor for the HadoopThriftServer glue with Thrift Class.
|
|
|
+ *
|
|
|
+ * @param name - the name of this handler
|
|
|
+ */
|
|
|
+ public HadoopThriftHandler(String name) {
|
|
|
+ conf = new Configuration();
|
|
|
+ now = now();
|
|
|
+ try {
|
|
|
+ inactivityThread = new Daemon(new InactivityMonitor());
|
|
|
+ fs = FileSystem.get(conf);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Unable to open hadoop file system...");
|
|
|
+ Runtime.getRuntime().exit(-1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * printStackTrace
|
|
|
+ *
|
|
|
+ * Helper function to print an exception stack trace to the log and not stderr
|
|
|
+ *
|
|
|
+ * @param e the exception
|
|
|
+ *
|
|
|
+ */
|
|
|
+ static private void printStackTrace(Exception e) {
|
|
|
+ for(StackTraceElement s: e.getStackTrace()) {
|
|
|
+ LOG.error(s);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Lookup a thrift object into a hadoop object
|
|
|
+ */
|
|
|
+ private synchronized Object lookup(long id) {
|
|
|
+ return hadoopHash.get(new Long(id));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Insert a thrift object into a hadoop object. Return its id.
|
|
|
+ */
|
|
|
+ private synchronized long insert(Object o) {
|
|
|
+ nextId++;
|
|
|
+ hadoopHash.put(nextId, o);
|
|
|
+ return nextId;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delete a thrift object from the hadoop store.
|
|
|
+ */
|
|
|
+ private synchronized Object remove(long id) {
|
|
|
+ return hadoopHash.remove(new Long(id));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Implement the API exported by this thrift server
|
|
|
+ */
|
|
|
+
|
|
|
+ /** Set inactivity timeout period. The period is specified in seconds.
|
|
|
+ * if there are no RPC calls to the HadoopThrift server for this much
|
|
|
+ * time, then the server kills itself.
|
|
|
+ */
|
|
|
+ public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
|
|
|
+ inactivityPeriod = periodInSeconds * 1000; // in milli seconds
|
|
|
+ if (inactivityRecheckInterval > inactivityPeriod ) {
|
|
|
+ inactivityRecheckInterval = inactivityPeriod;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a file and open it for writing
|
|
|
+ */
|
|
|
+ public ThriftHandle create(Pathname path) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("create: " + path);
|
|
|
+ FSDataOutputStream out = fs.create(new Path(path.pathname));
|
|
|
+ long id = insert(out);
|
|
|
+ ThriftHandle obj = new ThriftHandle(id);
|
|
|
+ HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
|
|
|
+ return obj;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a file and open it for writing, delete file if it exists
|
|
|
+ */
|
|
|
+ public ThriftHandle createFile(Pathname path,
|
|
|
+ short mode,
|
|
|
+ boolean overwrite,
|
|
|
+ int bufferSize,
|
|
|
+ short replication,
|
|
|
+ long blockSize) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("create: " + path +
|
|
|
+ " permission: " + mode +
|
|
|
+ " overwrite: " + overwrite +
|
|
|
+ " bufferSize: " + bufferSize +
|
|
|
+ " replication: " + replication +
|
|
|
+ " blockSize: " + blockSize);
|
|
|
+ FSDataOutputStream out = fs.create(new Path(path.pathname),
|
|
|
+ new FsPermission(mode),
|
|
|
+ overwrite,
|
|
|
+ bufferSize,
|
|
|
+ replication,
|
|
|
+ blockSize,
|
|
|
+ null); // progress
|
|
|
+ long id = insert(out);
|
|
|
+ ThriftHandle obj = new ThriftHandle(id);
|
|
|
+ HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
|
|
|
+ return obj;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Opens an existing file and returns a handle to read it
|
|
|
+ */
|
|
|
+ public ThriftHandle open(Pathname path) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("open: " + path);
|
|
|
+ FSDataInputStream out = fs.open(new Path(path.pathname));
|
|
|
+ long id = insert(out);
|
|
|
+ ThriftHandle obj = new ThriftHandle(id);
|
|
|
+ HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
|
|
|
+ return obj;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Opens an existing file to append to it.
|
|
|
+ */
|
|
|
+ public ThriftHandle append(Pathname path) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("append: " + path);
|
|
|
+ FSDataOutputStream out = fs.append(new Path(path.pathname));
|
|
|
+ long id = insert(out);
|
|
|
+ ThriftHandle obj = new ThriftHandle(id);
|
|
|
+ HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
|
|
|
+ return obj;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * write to a file
|
|
|
+ */
|
|
|
+ public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("write: " + tout.id);
|
|
|
+ FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
|
|
|
+ byte[] tmp = data.getBytes("UTF-8");
|
|
|
+ out.write(tmp, 0, tmp.length);
|
|
|
+ HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
|
|
|
+ return true;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * read from a file
|
|
|
+ */
|
|
|
+ public String read(ThriftHandle tout, long offset,
|
|
|
+ int length) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("read: " + tout.id +
|
|
|
+ " offset: " + offset +
|
|
|
+ " length: " + length);
|
|
|
+ FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
|
|
|
+ if (in.getPos() != offset) {
|
|
|
+ in.seek(offset);
|
|
|
+ }
|
|
|
+ byte[] tmp = new byte[length];
|
|
|
+ int numbytes = in.read(offset, tmp, 0, length);
|
|
|
+ HadoopThriftHandler.LOG.debug("read done: " + tout.id);
|
|
|
+ return new String(tmp, 0, numbytes, "UTF-8");
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delete a file/directory
|
|
|
+ */
|
|
|
+ public boolean rm(Pathname path, boolean recursive)
|
|
|
+ throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("rm: " + path +
|
|
|
+ " recursive: " + recursive);
|
|
|
+ boolean ret = fs.delete(new Path(path.pathname), recursive);
|
|
|
+ HadoopThriftHandler.LOG.debug("rm: " + path);
|
|
|
+ return ret;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Move a file/directory
|
|
|
+ */
|
|
|
+ public boolean rename(Pathname path, Pathname dest)
|
|
|
+ throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("rename: " + path +
|
|
|
+ " destination: " + dest);
|
|
|
+ boolean ret = fs.rename(new Path(path.pathname),
|
|
|
+ new Path(dest.pathname));
|
|
|
+ HadoopThriftHandler.LOG.debug("rename: " + path);
|
|
|
+ return ret;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * close file
|
|
|
+ */
|
|
|
+ public boolean close(ThriftHandle tout) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("close: " + tout.id);
|
|
|
+ Object obj = remove(tout.id);
|
|
|
+ if (obj instanceof FSDataOutputStream) {
|
|
|
+ FSDataOutputStream out = (FSDataOutputStream)obj;
|
|
|
+ out.close();
|
|
|
+ } else if (obj instanceof FSDataInputStream) {
|
|
|
+ FSDataInputStream in = (FSDataInputStream)obj;
|
|
|
+ in.close();
|
|
|
+ } else {
|
|
|
+ throw new ThriftIOException("Unknown thrift handle.");
|
|
|
+ }
|
|
|
+ HadoopThriftHandler.LOG.debug("closed: " + tout.id);
|
|
|
+ return true;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a directory
|
|
|
+ */
|
|
|
+ public boolean mkdirs(Pathname path) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("mkdirs: " + path);
|
|
|
+ boolean ret = fs.mkdirs(new Path(path.pathname));
|
|
|
+ HadoopThriftHandler.LOG.debug("mkdirs: " + path);
|
|
|
+ return ret;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Does this pathname exist?
|
|
|
+ */
|
|
|
+ public boolean exists(Pathname path) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("exists: " + path);
|
|
|
+ boolean ret = fs.exists(new Path(path.pathname));
|
|
|
+ HadoopThriftHandler.LOG.debug("exists done: " + path);
|
|
|
+ return ret;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns status about the specified pathname
|
|
|
+ */
|
|
|
+ public org.apache.hadoop.thriftfs.api.FileStatus stat(
|
|
|
+ Pathname path) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("stat: " + path);
|
|
|
+ org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
|
|
|
+ new Path(path.pathname));
|
|
|
+ HadoopThriftHandler.LOG.debug("stat done: " + path);
|
|
|
+ return new org.apache.hadoop.thriftfs.api.FileStatus(
|
|
|
+ stat.getPath().toString(),
|
|
|
+ stat.getLen(),
|
|
|
+ stat.isDir(),
|
|
|
+ stat.getReplication(),
|
|
|
+ stat.getBlockSize(),
|
|
|
+ stat.getModificationTime(),
|
|
|
+ stat.getPermission().toString(),
|
|
|
+ stat.getOwner(),
|
|
|
+ stat.getGroup());
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If the specified pathname is a directory, then return the
|
|
|
+ * list of pathnames in this directory
|
|
|
+ */
|
|
|
+ public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
|
|
|
+ Pathname path) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("listStatus: " + path);
|
|
|
+
|
|
|
+ org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
|
|
|
+ new Path(path.pathname));
|
|
|
+ HadoopThriftHandler.LOG.debug("listStatus done: " + path);
|
|
|
+ org.apache.hadoop.thriftfs.api.FileStatus tmp;
|
|
|
+ List<org.apache.hadoop.thriftfs.api.FileStatus> value =
|
|
|
+ new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();
|
|
|
+
|
|
|
+ for (int i = 0; i < stat.length; i++) {
|
|
|
+ tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
|
|
|
+ stat[i].getPath().toString(),
|
|
|
+ stat[i].getLen(),
|
|
|
+ stat[i].isDir(),
|
|
|
+ stat[i].getReplication(),
|
|
|
+ stat[i].getBlockSize(),
|
|
|
+ stat[i].getModificationTime(),
|
|
|
+ stat[i].getPermission().toString(),
|
|
|
+ stat[i].getOwner(),
|
|
|
+ stat[i].getGroup());
|
|
|
+ value.add(tmp);
|
|
|
+ }
|
|
|
+ return value;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the permission of a pathname
|
|
|
+ */
|
|
|
+ public void chmod(Pathname path, short mode) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("chmod: " + path +
|
|
|
+ " mode " + mode);
|
|
|
+ fs.setPermission(new Path(path.pathname), new FsPermission(mode));
|
|
|
+ HadoopThriftHandler.LOG.debug("chmod done: " + path);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the owner & group of a pathname
|
|
|
+ */
|
|
|
+ public void chown(Pathname path, String owner, String group)
|
|
|
+ throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("chown: " + path +
|
|
|
+ " owner: " + owner +
|
|
|
+ " group: " + group);
|
|
|
+ fs.setOwner(new Path(path.pathname), owner, group);
|
|
|
+ HadoopThriftHandler.LOG.debug("chown done: " + path);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the replication factor of a file
|
|
|
+ */
|
|
|
+ public void setReplication(Pathname path, short repl) throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("setrepl: " + path +
|
|
|
+ " replication factor: " + repl);
|
|
|
+ fs.setReplication(new Path(path.pathname), repl);
|
|
|
+ HadoopThriftHandler.LOG.debug("setrepl done: " + path);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the block locations of this file
|
|
|
+ */
|
|
|
+ public List<org.apache.hadoop.thriftfs.api.BlockLocation>
|
|
|
+ getFileBlockLocations(Pathname path, long start, long length)
|
|
|
+ throws ThriftIOException {
|
|
|
+ try {
|
|
|
+ now = now();
|
|
|
+ HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);
|
|
|
+
|
|
|
+ org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
|
|
|
+ new Path(path.pathname));
|
|
|
+
|
|
|
+ org.apache.hadoop.fs.BlockLocation[] stat =
|
|
|
+ fs.getFileBlockLocations(status, start, length);
|
|
|
+ HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);
|
|
|
+
|
|
|
+ org.apache.hadoop.thriftfs.api.BlockLocation tmp;
|
|
|
+ List<org.apache.hadoop.thriftfs.api.BlockLocation> value =
|
|
|
+ new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();
|
|
|
+
|
|
|
+ for (int i = 0; i < stat.length; i++) {
|
|
|
+
|
|
|
+ // construct the list of hostnames from the array returned
|
|
|
+ // by HDFS
|
|
|
+ List<String> hosts = new LinkedList<String>();
|
|
|
+ String[] hostsHdfs = stat[i].getHosts();
|
|
|
+ for (int j = 0; j < hostsHdfs.length; j++) {
|
|
|
+ hosts.add(hostsHdfs[j]);
|
|
|
+ }
|
|
|
+
|
|
|
+ // construct the list of host:port from the array returned
|
|
|
+ // by HDFS
|
|
|
+ List<String> names = new LinkedList<String>();
|
|
|
+ String[] namesHdfs = stat[i].getNames();
|
|
|
+ for (int j = 0; j < namesHdfs.length; j++) {
|
|
|
+ names.add(namesHdfs[j]);
|
|
|
+ }
|
|
|
+ tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
|
|
|
+ hosts, names, stat[i].getOffset(), stat[i].getLength());
|
|
|
+ value.add(tmp);
|
|
|
+ }
|
|
|
+ return value;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ThriftIOException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Bind to port. If the specified port is 0, then bind to random port.
|
|
|
+ private ServerSocket createServerSocket(int port) throws IOException {
|
|
|
+ try {
|
|
|
+ ServerSocket sock = new ServerSocket();
|
|
|
+ // Prevent 2MSL delay problem on server restarts
|
|
|
+ sock.setReuseAddress(true);
|
|
|
+ // Bind to listening port
|
|
|
+ if (port == 0) {
|
|
|
+ sock.bind(null);
|
|
|
+ serverPort = sock.getLocalPort();
|
|
|
+ } else {
|
|
|
+ sock.bind(new InetSocketAddress(port));
|
|
|
+ }
|
|
|
+ return sock;
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ throw new IOException("Could not create ServerSocket on port " + port + "." +
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Constrcts a server object
|
|
|
+ */
|
|
|
+ public HadoopThriftServer(String [] args) {
|
|
|
+
|
|
|
+ if (args.length > 0) {
|
|
|
+ serverPort = new Integer(args[0]);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ ServerSocket ssock = createServerSocket(serverPort);
|
|
|
+ TServerTransport serverTransport = new TServerSocket(ssock);
|
|
|
+ Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
|
|
|
+ ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
|
|
|
+ TThreadPoolServer.Options options = new TThreadPoolServer.Options();
|
|
|
+ options.minWorkerThreads = 10;
|
|
|
+ server = new TThreadPoolServer(processor, serverTransport,
|
|
|
+ new TTransportFactory(),
|
|
|
+ new TTransportFactory(),
|
|
|
+ new TBinaryProtocol.Factory(),
|
|
|
+ new TBinaryProtocol.Factory(),
|
|
|
+ options);
|
|
|
+ System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
|
|
|
+ HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
|
|
|
+ System.out.flush();
|
|
|
+
|
|
|
+ } catch (Exception x) {
|
|
|
+ x.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void main(String [] args) {
|
|
|
+ HadoopThriftServer me = new HadoopThriftServer(args);
|
|
|
+ me.server.serve();
|
|
|
+ }
|
|
|
+};
|
|
|
+
|