|
@@ -1,616 +0,0 @@
|
|
|
-package org.apache.hadoop.thriftfs;
|
|
|
-
|
|
|
-import com.facebook.thrift.TException;
|
|
|
-import com.facebook.thrift.TApplicationException;
|
|
|
-import com.facebook.thrift.protocol.TBinaryProtocol;
|
|
|
-import com.facebook.thrift.protocol.TProtocol;
|
|
|
-import com.facebook.thrift.server.TServer;
|
|
|
-import com.facebook.thrift.server.TThreadPoolServer;
|
|
|
-import com.facebook.thrift.transport.TServerSocket;
|
|
|
-import com.facebook.thrift.transport.TServerTransport;
|
|
|
-import com.facebook.thrift.transport.TTransportFactory;
|
|
|
-
|
|
|
-// Include Generated code
|
|
|
-import org.apache.hadoop.thriftfs.api.*;
|
|
|
-import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
|
|
|
-
|
|
|
-import java.io.*;
|
|
|
-import java.util.*;
|
|
|
-import java.net.*;
|
|
|
-
|
|
|
-import org.apache.hadoop.fs.*;
|
|
|
-import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.util.Daemon;
|
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
-
|
|
|
-/**
|
|
|
- * ThriftHadoopFileSystem
|
|
|
- * A thrift wrapper around the Hadoop File System
|
|
|
- */
|
|
|
-public class HadoopThriftServer extends ThriftHadoopFileSystem {
|
|
|
-
|
|
|
- static int serverPort = 0; // default port
|
|
|
- TServer server = null;
|
|
|
-
|
|
|
- public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
|
|
|
- {
|
|
|
-
|
|
|
- public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");
|
|
|
-
|
|
|
- // HDFS glue
|
|
|
- Configuration conf;
|
|
|
- FileSystem fs;
|
|
|
-
|
|
|
- // stucture that maps each Thrift object into an hadoop object
|
|
|
- private long nextId = new Random().nextLong();
|
|
|
- private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
|
|
|
- private Daemon inactivityThread = null;
|
|
|
-
|
|
|
- // Detect inactive session
|
|
|
- private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
|
|
|
- private static volatile long inactivityRecheckInterval = 60 * 1000;
|
|
|
- private static volatile boolean fsRunning = true;
|
|
|
- private static long now;
|
|
|
-
|
|
|
- // allow outsider to change the hadoopthrift path
|
|
|
- public void setOption(String key, String val) {
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Current system time.
|
|
|
- * @return current time in msec.
|
|
|
- */
|
|
|
- static long now() {
|
|
|
- return System.currentTimeMillis();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * getVersion
|
|
|
- *
|
|
|
- * @return current version of the interface.
|
|
|
- */
|
|
|
- public String getVersion() {
|
|
|
- return "0.1";
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * shutdown
|
|
|
- *
|
|
|
- * cleanly closes everything and exit.
|
|
|
- */
|
|
|
- public void shutdown(int status) {
|
|
|
- LOG.info("HadoopThriftServer shutting down.");
|
|
|
- try {
|
|
|
- fs.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Unable to close file system");
|
|
|
- }
|
|
|
- Runtime.getRuntime().exit(status);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Periodically checks to see if there is inactivity
|
|
|
- */
|
|
|
- class InactivityMonitor implements Runnable {
|
|
|
- public void run() {
|
|
|
- while (fsRunning) {
|
|
|
- try {
|
|
|
- if (now() > now + inactivityPeriod) {
|
|
|
- LOG.warn("HadoopThriftServer Inactivity period of " +
|
|
|
- inactivityPeriod + " expired... Stopping Server.");
|
|
|
- shutdown(-1);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error(StringUtils.stringifyException(e));
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(inactivityRecheckInterval);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * HadoopThriftServer
|
|
|
- *
|
|
|
- * Constructor for the HadoopThriftServer glue with Thrift Class.
|
|
|
- *
|
|
|
- * @param name - the name of this handler
|
|
|
- */
|
|
|
- public HadoopThriftHandler(String name) {
|
|
|
- conf = new Configuration();
|
|
|
- now = now();
|
|
|
- try {
|
|
|
- inactivityThread = new Daemon(new InactivityMonitor());
|
|
|
- fs = FileSystem.get(conf);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Unable to open hadoop file system...");
|
|
|
- Runtime.getRuntime().exit(-1);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * printStackTrace
|
|
|
- *
|
|
|
- * Helper function to print an exception stack trace to the log and not stderr
|
|
|
- *
|
|
|
- * @param e the exception
|
|
|
- *
|
|
|
- */
|
|
|
- static private void printStackTrace(Exception e) {
|
|
|
- for(StackTraceElement s: e.getStackTrace()) {
|
|
|
- LOG.error(s);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Lookup a thrift object into a hadoop object
|
|
|
- */
|
|
|
- private synchronized Object lookup(long id) {
|
|
|
- return hadoopHash.get(new Long(id));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Insert a thrift object into a hadoop object. Return its id.
|
|
|
- */
|
|
|
- private synchronized long insert(Object o) {
|
|
|
- nextId++;
|
|
|
- hadoopHash.put(nextId, o);
|
|
|
- return nextId;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Delete a thrift object from the hadoop store.
|
|
|
- */
|
|
|
- private synchronized Object remove(long id) {
|
|
|
- return hadoopHash.remove(new Long(id));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Implement the API exported by this thrift server
|
|
|
- */
|
|
|
-
|
|
|
- /** Set inactivity timeout period. The period is specified in seconds.
|
|
|
- * if there are no RPC calls to the HadoopThrift server for this much
|
|
|
- * time, then the server kills itself.
|
|
|
- */
|
|
|
- public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
|
|
|
- inactivityPeriod = periodInSeconds * 1000; // in milli seconds
|
|
|
- if (inactivityRecheckInterval > inactivityPeriod ) {
|
|
|
- inactivityRecheckInterval = inactivityPeriod;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a file and open it for writing
|
|
|
- */
|
|
|
- public ThriftHandle create(Pathname path) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("create: " + path);
|
|
|
- FSDataOutputStream out = fs.create(new Path(path.pathname));
|
|
|
- long id = insert(out);
|
|
|
- ThriftHandle obj = new ThriftHandle(id);
|
|
|
- HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
|
|
|
- return obj;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a file and open it for writing, delete file if it exists
|
|
|
- */
|
|
|
- public ThriftHandle createFile(Pathname path,
|
|
|
- short mode,
|
|
|
- boolean overwrite,
|
|
|
- int bufferSize,
|
|
|
- short replication,
|
|
|
- long blockSize) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("create: " + path +
|
|
|
- " permission: " + mode +
|
|
|
- " overwrite: " + overwrite +
|
|
|
- " bufferSize: " + bufferSize +
|
|
|
- " replication: " + replication +
|
|
|
- " blockSize: " + blockSize);
|
|
|
- FSDataOutputStream out = fs.create(new Path(path.pathname),
|
|
|
- new FsPermission(mode),
|
|
|
- overwrite,
|
|
|
- bufferSize,
|
|
|
- replication,
|
|
|
- blockSize,
|
|
|
- null); // progress
|
|
|
- long id = insert(out);
|
|
|
- ThriftHandle obj = new ThriftHandle(id);
|
|
|
- HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
|
|
|
- return obj;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Opens an existing file and returns a handle to read it
|
|
|
- */
|
|
|
- public ThriftHandle open(Pathname path) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("open: " + path);
|
|
|
- FSDataInputStream out = fs.open(new Path(path.pathname));
|
|
|
- long id = insert(out);
|
|
|
- ThriftHandle obj = new ThriftHandle(id);
|
|
|
- HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
|
|
|
- return obj;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Opens an existing file to append to it.
|
|
|
- */
|
|
|
- public ThriftHandle append(Pathname path) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("append: " + path);
|
|
|
- FSDataOutputStream out = fs.append(new Path(path.pathname));
|
|
|
- long id = insert(out);
|
|
|
- ThriftHandle obj = new ThriftHandle(id);
|
|
|
- HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
|
|
|
- return obj;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * write to a file
|
|
|
- */
|
|
|
- public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("write: " + tout.id);
|
|
|
- FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
|
|
|
- byte[] tmp = data.getBytes("UTF-8");
|
|
|
- out.write(tmp, 0, tmp.length);
|
|
|
- HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
|
|
|
- return true;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * read from a file
|
|
|
- */
|
|
|
- public String read(ThriftHandle tout, long offset,
|
|
|
- int length) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("read: " + tout.id +
|
|
|
- " offset: " + offset +
|
|
|
- " length: " + length);
|
|
|
- FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
|
|
|
- if (in.getPos() != offset) {
|
|
|
- in.seek(offset);
|
|
|
- }
|
|
|
- byte[] tmp = new byte[length];
|
|
|
- int numbytes = in.read(offset, tmp, 0, length);
|
|
|
- HadoopThriftHandler.LOG.debug("read done: " + tout.id);
|
|
|
- return new String(tmp, 0, numbytes, "UTF-8");
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Delete a file/directory
|
|
|
- */
|
|
|
- public boolean rm(Pathname path, boolean recursive)
|
|
|
- throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("rm: " + path +
|
|
|
- " recursive: " + recursive);
|
|
|
- boolean ret = fs.delete(new Path(path.pathname), recursive);
|
|
|
- HadoopThriftHandler.LOG.debug("rm: " + path);
|
|
|
- return ret;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Move a file/directory
|
|
|
- */
|
|
|
- public boolean rename(Pathname path, Pathname dest)
|
|
|
- throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("rename: " + path +
|
|
|
- " destination: " + dest);
|
|
|
- boolean ret = fs.rename(new Path(path.pathname),
|
|
|
- new Path(dest.pathname));
|
|
|
- HadoopThriftHandler.LOG.debug("rename: " + path);
|
|
|
- return ret;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * close file
|
|
|
- */
|
|
|
- public boolean close(ThriftHandle tout) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("close: " + tout.id);
|
|
|
- Object obj = remove(tout.id);
|
|
|
- if (obj instanceof FSDataOutputStream) {
|
|
|
- FSDataOutputStream out = (FSDataOutputStream)obj;
|
|
|
- out.close();
|
|
|
- } else if (obj instanceof FSDataInputStream) {
|
|
|
- FSDataInputStream in = (FSDataInputStream)obj;
|
|
|
- in.close();
|
|
|
- } else {
|
|
|
- throw new ThriftIOException("Unknown thrift handle.");
|
|
|
- }
|
|
|
- HadoopThriftHandler.LOG.debug("closed: " + tout.id);
|
|
|
- return true;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a directory
|
|
|
- */
|
|
|
- public boolean mkdirs(Pathname path) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("mkdirs: " + path);
|
|
|
- boolean ret = fs.mkdirs(new Path(path.pathname));
|
|
|
- HadoopThriftHandler.LOG.debug("mkdirs: " + path);
|
|
|
- return ret;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Does this pathname exist?
|
|
|
- */
|
|
|
- public boolean exists(Pathname path) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("exists: " + path);
|
|
|
- boolean ret = fs.exists(new Path(path.pathname));
|
|
|
- HadoopThriftHandler.LOG.debug("exists done: " + path);
|
|
|
- return ret;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns status about the specified pathname
|
|
|
- */
|
|
|
- public org.apache.hadoop.thriftfs.api.FileStatus stat(
|
|
|
- Pathname path) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("stat: " + path);
|
|
|
- org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
|
|
|
- new Path(path.pathname));
|
|
|
- HadoopThriftHandler.LOG.debug("stat done: " + path);
|
|
|
- return new org.apache.hadoop.thriftfs.api.FileStatus(
|
|
|
- stat.getPath().toString(),
|
|
|
- stat.getLen(),
|
|
|
- stat.isDir(),
|
|
|
- stat.getReplication(),
|
|
|
- stat.getBlockSize(),
|
|
|
- stat.getModificationTime(),
|
|
|
- stat.getPermission().toString(),
|
|
|
- stat.getOwner(),
|
|
|
- stat.getGroup());
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * If the specified pathname is a directory, then return the
|
|
|
- * list of pathnames in this directory
|
|
|
- */
|
|
|
- public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
|
|
|
- Pathname path) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("listStatus: " + path);
|
|
|
-
|
|
|
- org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
|
|
|
- new Path(path.pathname));
|
|
|
- HadoopThriftHandler.LOG.debug("listStatus done: " + path);
|
|
|
- org.apache.hadoop.thriftfs.api.FileStatus tmp;
|
|
|
- List<org.apache.hadoop.thriftfs.api.FileStatus> value =
|
|
|
- new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();
|
|
|
-
|
|
|
- for (int i = 0; i < stat.length; i++) {
|
|
|
- tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
|
|
|
- stat[i].getPath().toString(),
|
|
|
- stat[i].getLen(),
|
|
|
- stat[i].isDir(),
|
|
|
- stat[i].getReplication(),
|
|
|
- stat[i].getBlockSize(),
|
|
|
- stat[i].getModificationTime(),
|
|
|
- stat[i].getPermission().toString(),
|
|
|
- stat[i].getOwner(),
|
|
|
- stat[i].getGroup());
|
|
|
- value.add(tmp);
|
|
|
- }
|
|
|
- return value;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets the permission of a pathname
|
|
|
- */
|
|
|
- public void chmod(Pathname path, short mode) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("chmod: " + path +
|
|
|
- " mode " + mode);
|
|
|
- fs.setPermission(new Path(path.pathname), new FsPermission(mode));
|
|
|
- HadoopThriftHandler.LOG.debug("chmod done: " + path);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets the owner & group of a pathname
|
|
|
- */
|
|
|
- public void chown(Pathname path, String owner, String group)
|
|
|
- throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("chown: " + path +
|
|
|
- " owner: " + owner +
|
|
|
- " group: " + group);
|
|
|
- fs.setOwner(new Path(path.pathname), owner, group);
|
|
|
- HadoopThriftHandler.LOG.debug("chown done: " + path);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets the replication factor of a file
|
|
|
- */
|
|
|
- public void setReplication(Pathname path, short repl) throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("setrepl: " + path +
|
|
|
- " replication factor: " + repl);
|
|
|
- fs.setReplication(new Path(path.pathname), repl);
|
|
|
- HadoopThriftHandler.LOG.debug("setrepl done: " + path);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the block locations of this file
|
|
|
- */
|
|
|
- public List<org.apache.hadoop.thriftfs.api.BlockLocation>
|
|
|
- getFileBlockLocations(Pathname path, long start, long length)
|
|
|
- throws ThriftIOException {
|
|
|
- try {
|
|
|
- now = now();
|
|
|
- HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);
|
|
|
-
|
|
|
- org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
|
|
|
- new Path(path.pathname));
|
|
|
-
|
|
|
- org.apache.hadoop.fs.BlockLocation[] stat =
|
|
|
- fs.getFileBlockLocations(status, start, length);
|
|
|
- HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);
|
|
|
-
|
|
|
- org.apache.hadoop.thriftfs.api.BlockLocation tmp;
|
|
|
- List<org.apache.hadoop.thriftfs.api.BlockLocation> value =
|
|
|
- new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();
|
|
|
-
|
|
|
- for (int i = 0; i < stat.length; i++) {
|
|
|
-
|
|
|
- // construct the list of hostnames from the array returned
|
|
|
- // by HDFS
|
|
|
- List<String> hosts = new LinkedList<String>();
|
|
|
- String[] hostsHdfs = stat[i].getHosts();
|
|
|
- for (int j = 0; j < hostsHdfs.length; j++) {
|
|
|
- hosts.add(hostsHdfs[j]);
|
|
|
- }
|
|
|
-
|
|
|
- // construct the list of host:port from the array returned
|
|
|
- // by HDFS
|
|
|
- List<String> names = new LinkedList<String>();
|
|
|
- String[] namesHdfs = stat[i].getNames();
|
|
|
- for (int j = 0; j < namesHdfs.length; j++) {
|
|
|
- names.add(namesHdfs[j]);
|
|
|
- }
|
|
|
- tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
|
|
|
- hosts, names, stat[i].getOffset(), stat[i].getLength());
|
|
|
- value.add(tmp);
|
|
|
- }
|
|
|
- return value;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ThriftIOException(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Bind to port. If the specified port is 0, then bind to random port.
|
|
|
- private ServerSocket createServerSocket(int port) throws IOException {
|
|
|
- try {
|
|
|
- ServerSocket sock = new ServerSocket();
|
|
|
- // Prevent 2MSL delay problem on server restarts
|
|
|
- sock.setReuseAddress(true);
|
|
|
- // Bind to listening port
|
|
|
- if (port == 0) {
|
|
|
- sock.bind(null);
|
|
|
- serverPort = sock.getLocalPort();
|
|
|
- } else {
|
|
|
- sock.bind(new InetSocketAddress(port));
|
|
|
- }
|
|
|
- return sock;
|
|
|
- } catch (IOException ioe) {
|
|
|
- throw new IOException("Could not create ServerSocket on port " + port + "." +
|
|
|
- ioe);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Constrcts a server object
|
|
|
- */
|
|
|
- public HadoopThriftServer(String [] args) {
|
|
|
-
|
|
|
- if (args.length > 0) {
|
|
|
- serverPort = new Integer(args[0]);
|
|
|
- }
|
|
|
- try {
|
|
|
- ServerSocket ssock = createServerSocket(serverPort);
|
|
|
- TServerTransport serverTransport = new TServerSocket(ssock);
|
|
|
- Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
|
|
|
- ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
|
|
|
- TThreadPoolServer.Options options = new TThreadPoolServer.Options();
|
|
|
- options.minWorkerThreads = 10;
|
|
|
- server = new TThreadPoolServer(processor, serverTransport,
|
|
|
- new TTransportFactory(),
|
|
|
- new TTransportFactory(),
|
|
|
- new TBinaryProtocol.Factory(),
|
|
|
- new TBinaryProtocol.Factory(),
|
|
|
- options);
|
|
|
- System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
|
|
|
- HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
|
|
|
- System.out.flush();
|
|
|
-
|
|
|
- } catch (Exception x) {
|
|
|
- x.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public static void main(String [] args) {
|
|
|
- HadoopThriftServer me = new HadoopThriftServer(args);
|
|
|
- me.server.serve();
|
|
|
- }
|
|
|
-};
|
|
|
-
|