DistributedFileSystem.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.dfs;
  19. import java.io.*;
  20. import java.net.*;
  21. import org.apache.hadoop.fs.permission.FsPermission;
  22. import org.apache.hadoop.fs.*;
  23. import org.apache.hadoop.ipc.RemoteException;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
  26. import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
  27. import org.apache.hadoop.util.*;
  28. /****************************************************************
  29. * Implementation of the abstract FileSystem for the DFS system.
  30. * This object is the way end-user code interacts with a Hadoop
  31. * DistributedFileSystem.
  32. *
  33. *****************************************************************/
  34. public class DistributedFileSystem extends FileSystem {
  35. private Path workingDir;
  36. private URI uri;
  37. DFSClient dfs;
  38. private boolean verifyChecksum = true;
  39. public DistributedFileSystem() {
  40. }
  41. /** @deprecated */
  42. public DistributedFileSystem(InetSocketAddress namenode,
  43. Configuration conf) throws IOException {
  44. initialize(URI.create("hdfs://"+
  45. namenode.getHostName()+":"+
  46. namenode.getPort()),
  47. conf);
  48. }
  49. /** @deprecated */
  50. public String getName() { return uri.getAuthority(); }
  51. public URI getUri() { return uri; }
  52. public void initialize(URI uri, Configuration conf) throws IOException {
  53. setConf(conf);
  54. String host = uri.getHost();
  55. int port = uri.getPort();
  56. if (host == null || port == -1) {
  57. throw new IOException("Incomplete HDFS URI, no host/port: "+ uri);
  58. }
  59. this.dfs = new DFSClient(new InetSocketAddress(host, port), conf);
  60. this.uri = URI.create("hdfs://"+host+":"+port);
  61. this.workingDir = getHomeDirectory();
  62. }
  63. public Path getWorkingDirectory() {
  64. return workingDir;
  65. }
  66. public long getDefaultBlockSize() {
  67. return dfs.getDefaultBlockSize();
  68. }
  69. public short getDefaultReplication() {
  70. return dfs.getDefaultReplication();
  71. }
  72. private Path makeAbsolute(Path f) {
  73. if (f.isAbsolute()) {
  74. return f;
  75. } else {
  76. return new Path(workingDir, f);
  77. }
  78. }
  79. public void setWorkingDirectory(Path dir) {
  80. String result = makeAbsolute(dir).toUri().getPath();
  81. if (!FSNamesystem.isValidName(result)) {
  82. throw new IllegalArgumentException("Invalid DFS directory name " +
  83. result);
  84. }
  85. workingDir = makeAbsolute(dir);
  86. }
  87. /** {@inheritDoc} */
  88. public Path getHomeDirectory() {
  89. return new Path("/user/" + dfs.ugi.getUserName()).makeQualified(this);
  90. }
  91. private String getPathName(Path file) {
  92. checkPath(file);
  93. String result = makeAbsolute(file).toUri().getPath();
  94. if (!FSNamesystem.isValidName(result)) {
  95. throw new IllegalArgumentException("Pathname " + result + " from " +
  96. file+" is not a valid DFS filename.");
  97. }
  98. return result;
  99. }
  100. public BlockLocation[] getFileBlockLocations(Path f, long start,
  101. long len) throws IOException {
  102. return dfs.getBlockLocations(getPathName(f), start, len);
  103. }
  104. public void setVerifyChecksum(boolean verifyChecksum) {
  105. this.verifyChecksum = verifyChecksum;
  106. }
  107. public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  108. try {
  109. return new DFSClient.DFSDataInputStream(
  110. dfs.open(getPathName(f), bufferSize, verifyChecksum));
  111. } catch(RemoteException e) {
  112. if (IOException.class.getName().equals(e.getClassName()) &&
  113. e.getMessage().startsWith(
  114. "java.io.IOException: Cannot open filename")) {
  115. // non-existent path
  116. FileNotFoundException ne = new FileNotFoundException("File " + f + " does not exist.");
  117. throw (FileNotFoundException) ne.initCause(e);
  118. } else {
  119. throw e; // unexpected exception
  120. }
  121. }
  122. }
  123. public FSDataOutputStream create(Path f, FsPermission permission,
  124. boolean overwrite,
  125. int bufferSize, short replication, long blockSize,
  126. Progressable progress) throws IOException {
  127. return new FSDataOutputStream(dfs.create(getPathName(f), permission,
  128. overwrite, replication, blockSize, progress, bufferSize));
  129. }
  130. public boolean setReplication(Path src,
  131. short replication
  132. ) throws IOException {
  133. return dfs.setReplication(getPathName(src), replication);
  134. }
  135. /**
  136. * Rename files/dirs
  137. */
  138. public boolean rename(Path src, Path dst) throws IOException {
  139. return dfs.rename(getPathName(src), getPathName(dst));
  140. }
  141. /**
  142. * Get rid of Path f, whether a true file or dir.
  143. */
  144. @Deprecated
  145. public boolean delete(Path f) throws IOException {
  146. return dfs.delete(getPathName(f));
  147. }
  148. /**
  149. * requires a boolean check to delete a non
  150. * empty directory recursively.
  151. */
  152. public boolean delete(Path f, boolean recursive) throws IOException {
  153. return dfs.delete(getPathName(f), recursive);
  154. }
  155. public boolean exists(Path f) throws IOException {
  156. return dfs.exists(getPathName(f));
  157. }
  158. /** {@inheritDoc} */
  159. @Deprecated
  160. public long getContentLength(Path f) throws IOException {
  161. // If it is a directory, then issue a getContentLength
  162. // RPC to find the size of the entire subtree in one call.
  163. //
  164. if (f instanceof DfsPath) {
  165. DfsPath dfspath = (DfsPath)f;
  166. if (!dfspath.isDirectory()) {
  167. return dfspath.getContentsLength();
  168. }
  169. }
  170. return getContentSummary(f).getLength();
  171. }
  172. /** {@inheritDoc} */
  173. public ContentSummary getContentSummary(Path f) throws IOException {
  174. return dfs.getContentSummary(getPathName(f));
  175. }
  176. public FileStatus[] listStatus(Path p) throws IOException {
  177. DFSFileInfo[] infos = dfs.listPaths(getPathName(p));
  178. if (infos == null) return null;
  179. FileStatus[] stats = new FileStatus[infos.length];
  180. for (int i = 0; i < infos.length; i++) {
  181. DFSFileInfo f = (DFSFileInfo)infos[i];
  182. stats[i] = new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
  183. f.getBlockSize(), f.getModificationTime(),
  184. f.getPermission(), f.getOwner(), f.getGroup(),
  185. new DfsPath(f, this)); // fully-qualify path
  186. }
  187. return stats;
  188. }
  189. public boolean mkdirs(Path f, FsPermission permission) throws IOException {
  190. return dfs.mkdirs(getPathName(f), permission);
  191. }
  192. /** {@inheritDoc} */
  193. public synchronized void close() throws IOException {
  194. try {
  195. dfs.close();
  196. } finally {
  197. super.close();
  198. }
  199. }
  200. public String toString() {
  201. return "DFS[" + dfs + "]";
  202. }
  203. DFSClient getClient() {
  204. return dfs;
  205. }
  206. public static class DiskStatus {
  207. private long capacity;
  208. private long dfsUsed;
  209. private long remaining;
  210. public DiskStatus(long capacity, long dfsUsed, long remaining) {
  211. this.capacity = capacity;
  212. this.dfsUsed = dfsUsed;
  213. this.remaining = remaining;
  214. }
  215. public long getCapacity() {
  216. return capacity;
  217. }
  218. public long getDfsUsed() {
  219. return dfsUsed;
  220. }
  221. public long getRemaining() {
  222. return remaining;
  223. }
  224. }
  225. /** Return the disk usage of the filesystem, including total capacity,
  226. * used space, and remaining space */
  227. public DiskStatus getDiskStatus() throws IOException {
  228. return dfs.getDiskStatus();
  229. }
  230. /** Return the total raw capacity of the filesystem, disregarding
  231. * replication .*/
  232. public long getRawCapacity() throws IOException{
  233. return dfs.totalRawCapacity();
  234. }
  235. /** Return the total raw used space in the filesystem, disregarding
  236. * replication .*/
  237. public long getRawUsed() throws IOException{
  238. return dfs.totalRawUsed();
  239. }
  240. /** Return statistics for each datanode. */
  241. public DatanodeInfo[] getDataNodeStats() throws IOException {
  242. return dfs.datanodeReport(DatanodeReportType.ALL);
  243. }
  244. /**
  245. * Enter, leave or get safe mode.
  246. *
  247. * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(
  248. * FSConstants.SafeModeAction)
  249. */
  250. public boolean setSafeMode(FSConstants.SafeModeAction action)
  251. throws IOException {
  252. return dfs.setSafeMode(action);
  253. }
  254. /*
  255. * Refreshes the list of hosts and excluded hosts from the configured
  256. * files.
  257. */
  258. public void refreshNodes() throws IOException {
  259. dfs.refreshNodes();
  260. }
  261. /**
  262. * Finalize previously upgraded files system state.
  263. * @throws IOException
  264. */
  265. public void finalizeUpgrade() throws IOException {
  266. dfs.finalizeUpgrade();
  267. }
  268. public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
  269. ) throws IOException {
  270. return dfs.distributedUpgradeProgress(action);
  271. }
  272. /*
  273. * Requests the namenode to dump data strcutures into specified
  274. * file.
  275. */
  276. public void metaSave(String pathname) throws IOException {
  277. dfs.metaSave(pathname);
  278. }
  279. /**
  280. * We need to find the blocks that didn't match. Likely only one
  281. * is corrupt but we will report both to the namenode. In the future,
  282. * we can consider figuring out exactly which block is corrupt.
  283. */
  284. public boolean reportChecksumFailure(Path f,
  285. FSDataInputStream in, long inPos,
  286. FSDataInputStream sums, long sumsPos) {
  287. LocatedBlock lblocks[] = new LocatedBlock[2];
  288. // Find block in data stream.
  289. DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
  290. Block dataBlock = dfsIn.getCurrentBlock();
  291. if (dataBlock == null) {
  292. LOG.error("Error: Current block in data stream is null! ");
  293. return false;
  294. }
  295. DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()};
  296. lblocks[0] = new LocatedBlock(dataBlock, dataNode);
  297. LOG.info("Found checksum error in data stream at block="
  298. + dataBlock.getBlockName() + " on datanode="
  299. + dataNode[0].getName());
  300. // Find block in checksum stream
  301. DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
  302. Block sumsBlock = dfsSums.getCurrentBlock();
  303. if (sumsBlock == null) {
  304. LOG.error("Error: Current block in checksum stream is null! ");
  305. return false;
  306. }
  307. DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
  308. lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
  309. LOG.info("Found checksum error in checksum stream at block="
  310. + sumsBlock.getBlockName() + " on datanode="
  311. + sumsNode[0].getName());
  312. // Ask client to delete blocks.
  313. dfs.reportChecksumFailure(f.toString(), lblocks);
  314. return true;
  315. }
  316. /**
  317. * Returns the stat information about the file.
  318. * @throws FileNotFoundException if the file does not exist.
  319. */
  320. public FileStatus getFileStatus(Path f) throws IOException {
  321. if (f instanceof DfsPath) {
  322. DfsPath p = (DfsPath) f;
  323. return p.info;
  324. }
  325. try {
  326. DFSFileInfo p = dfs.getFileInfo(getPathName(f));
  327. return p;
  328. } catch (RemoteException e) {
  329. if (IOException.class.getName().equals(e.getClassName()) &&
  330. e.getMessage().startsWith(
  331. "java.io.IOException: File does not exist: ")) {
  332. // non-existent path
  333. FileNotFoundException fe = new FileNotFoundException("File " + f + " does not exist.");
  334. throw (FileNotFoundException) fe.initCause(e);
  335. } else {
  336. throw e; // unexpected exception
  337. }
  338. }
  339. }
  340. /** {@inheritDoc }*/
  341. public void setPermission(Path p, FsPermission permission
  342. ) throws IOException {
  343. dfs.namenode.setPermission(getPathName(p), permission);
  344. }
  345. /** {@inheritDoc }*/
  346. public void setOwner(Path p, String username, String groupname
  347. ) throws IOException {
  348. if (username == null && groupname == null) {
  349. throw new IOException("username == null && groupname == null");
  350. }
  351. dfs.namenode.setOwner(getPathName(p), username, groupname);
  352. }
  353. }