FSEditLog.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.dfs;
  19. import java.io.BufferedInputStream;
  20. import java.io.DataInputStream;
  21. import java.io.DataOutputStream;
  22. import java.io.EOFException;
  23. import java.io.File;
  24. import java.io.FileDescriptor;
  25. import java.io.FileInputStream;
  26. import java.io.FileOutputStream;
  27. import java.io.IOException;
  28. import java.util.ArrayList;
  29. import org.apache.hadoop.io.ArrayWritable;
  30. import org.apache.hadoop.io.UTF8;
  31. import org.apache.hadoop.io.Writable;
  32. /**
  33. * FSEditLog maintains a log of the namespace modifications.
  34. *
  35. */
  36. class FSEditLog {
  37. private static final byte OP_ADD = 0;
  38. private static final byte OP_RENAME = 1;
  39. private static final byte OP_DELETE = 2;
  40. private static final byte OP_MKDIR = 3;
  41. private static final byte OP_SET_REPLICATION = 4;
  42. private static final byte OP_DATANODE_ADD = 5;
  43. private static final byte OP_DATANODE_REMOVE = 6;
  44. private ArrayList<EditLogOutputStream> editStreams = null;
  45. private FSImage fsimage = null;
  46. private long lastModificationTime;
  47. private long lastSyncTime;
  48. static class EditLogOutputStream extends DataOutputStream {
  49. private FileDescriptor fd;
  50. EditLogOutputStream(File name) throws IOException {
  51. super(new FileOutputStream(name, true)); // open for append
  52. this.fd = ((FileOutputStream)out).getFD();
  53. }
  54. void flushAndSync() throws IOException {
  55. this.flush();
  56. this.fd.sync();
  57. }
  58. void create() throws IOException {
  59. writeInt(FSConstants.LAYOUT_VERSION);
  60. flushAndSync();
  61. }
  62. }
  63. FSEditLog(FSImage image) {
  64. fsimage = image;
  65. lastModificationTime = 0;
  66. lastSyncTime = 0;
  67. }
  68. private File getEditFile(int idx) {
  69. return fsimage.getEditFile(idx);
  70. }
  71. private File getEditNewFile(int idx) {
  72. return fsimage.getEditNewFile(idx);
  73. }
  74. private int getNumStorageDirs() {
  75. return fsimage.getNumStorageDirs();
  76. }
  77. synchronized int getNumEditStreams() {
  78. return editStreams == null ? 0 : editStreams.size();
  79. }
  80. boolean isOpen() {
  81. return getNumEditStreams() > 0;
  82. }
  83. /**
  84. * Create empty edit log files.
  85. * Initialize the output stream for logging.
  86. *
  87. * @throws IOException
  88. */
  89. synchronized void open() throws IOException {
  90. int size = getNumStorageDirs();
  91. if (editStreams == null)
  92. editStreams = new ArrayList<EditLogOutputStream>(size);
  93. for (int idx = 0; idx < size; idx++) {
  94. File eFile = getEditFile(idx);
  95. try {
  96. EditLogOutputStream eStream = new EditLogOutputStream(eFile);
  97. editStreams.add(eStream);
  98. } catch (IOException e) {
  99. FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
  100. processIOError(idx);
  101. idx--;
  102. }
  103. }
  104. }
  105. synchronized void createEditLogFile(File name) throws IOException {
  106. EditLogOutputStream eStream = new EditLogOutputStream(name);
  107. eStream.create();
  108. eStream.flushAndSync();
  109. eStream.close();
  110. }
  111. /**
  112. * Create edits.new if non existant.
  113. */
  114. synchronized void createNewIfMissing() throws IOException {
  115. for (int idx = 0; idx < getNumStorageDirs(); idx++) {
  116. File newFile = getEditNewFile(idx);
  117. if (!newFile.exists())
  118. createEditLogFile(newFile);
  119. }
  120. }
  121. /**
  122. * Shutdown the filestore
  123. */
  124. synchronized void close() throws IOException {
  125. if (editStreams == null) {
  126. return;
  127. }
  128. for (int idx = 0; idx < editStreams.size(); idx++) {
  129. EditLogOutputStream eStream = editStreams.get(idx);
  130. try {
  131. eStream.flushAndSync();
  132. eStream.close();
  133. } catch (IOException e) {
  134. processIOError(idx);
  135. idx--;
  136. }
  137. }
  138. editStreams.clear();
  139. }
  140. /**
  141. * If there is an IO Error on any log operations, remove that
  142. * directory from the list of directories. If no more directories
  143. * remain, then raise an exception that will possibly cause the
  144. * server to exit
  145. */
  146. synchronized void processIOError(int index) throws IOException {
  147. if (editStreams == null || editStreams.size() == 1) {
  148. throw new IOException("Checkpoint directories inaccessible.");
  149. }
  150. assert(index < getNumStorageDirs());
  151. assert(getNumStorageDirs() == editStreams.size());
  152. editStreams.remove(index);
  153. //
  154. // Invoke the ioerror routine of the fsimage
  155. //
  156. fsimage.processIOError(index);
  157. }
  158. /**
  159. * check if ANY edits.new log exists
  160. */
  161. boolean existsNew() throws IOException {
  162. for (int idx = 0; idx < getNumStorageDirs(); idx++) {
  163. if (getEditNewFile(idx).exists()) {
  164. return true;
  165. }
  166. }
  167. return false;
  168. }
  169. /**
  170. * Load an edit log, and apply the changes to the in-memory structure
  171. * This is where we apply edits that we've been writing to disk all
  172. * along.
  173. */
  174. int loadFSEdits(File edits) throws IOException {
  175. FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
  176. FSDirectory fsDir = fsNamesys.dir;
  177. int numEdits = 0;
  178. int logVersion = 0;
  179. if (edits != null) {
  180. DataInputStream in = new DataInputStream(
  181. new BufferedInputStream(
  182. new FileInputStream(edits)));
  183. // Read log file version. Could be missing.
  184. in.mark(4);
  185. // If edits log is greater than 2G, available method will return negative
  186. // numbers, so we avoid having to call available
  187. boolean available = true;
  188. try {
  189. logVersion = in.readByte();
  190. } catch (EOFException e) {
  191. available = false;
  192. }
  193. if (available) {
  194. in.reset();
  195. if (logVersion >= 0)
  196. logVersion = 0;
  197. else
  198. logVersion = in.readInt();
  199. if (logVersion < FSConstants.LAYOUT_VERSION) // future version
  200. throw new IOException(
  201. "Unexpected version of the file system log file: "
  202. + logVersion
  203. + ". Current version = "
  204. + FSConstants.LAYOUT_VERSION + ".");
  205. }
  206. short replication = fsNamesys.getDefaultReplication();
  207. try {
  208. while (true) {
  209. long timestamp = 0;
  210. long mtime = 0;
  211. byte opcode = -1;
  212. try {
  213. opcode = in.readByte();
  214. } catch (EOFException e) {
  215. break; // no more transactions
  216. }
  217. numEdits++;
  218. switch (opcode) {
  219. case OP_ADD: {
  220. UTF8 name = new UTF8();
  221. ArrayWritable aw = null;
  222. Writable writables[];
  223. // version 0 does not support per file replication
  224. if (logVersion >= 0)
  225. name.readFields(in); // read name only
  226. else { // other versions do
  227. // get name and replication
  228. aw = new ArrayWritable(UTF8.class);
  229. aw.readFields(in);
  230. writables = aw.get();
  231. if (logVersion >= -4 && writables.length != 2 ||
  232. logVersion < -4 && writables.length != 3) {
  233. throw new IOException("Incorrect data fortmat. "
  234. + "Name & replication pair expected");
  235. }
  236. name = (UTF8) writables[0];
  237. replication = Short.parseShort(
  238. ((UTF8)writables[1]).toString());
  239. replication = adjustReplication(replication);
  240. if (logVersion < -4) {
  241. mtime = Long.parseLong(((UTF8)writables[2]).toString());
  242. }
  243. }
  244. // get blocks
  245. aw = new ArrayWritable(Block.class);
  246. aw.readFields(in);
  247. writables = aw.get();
  248. Block blocks[] = new Block[writables.length];
  249. System.arraycopy(writables, 0, blocks, 0, blocks.length);
  250. // add to the file tree
  251. fsDir.unprotectedAddFile(name.toString(), blocks, replication, mtime);
  252. break;
  253. }
  254. case OP_SET_REPLICATION: {
  255. UTF8 src = new UTF8();
  256. UTF8 repl = new UTF8();
  257. src.readFields(in);
  258. repl.readFields(in);
  259. replication = adjustReplication(fromLogReplication(repl));
  260. fsDir.unprotectedSetReplication(src.toString(),
  261. replication,
  262. null);
  263. break;
  264. }
  265. case OP_RENAME: {
  266. UTF8 src = null;
  267. UTF8 dst = null;
  268. if (logVersion >= -4) {
  269. src = new UTF8();
  270. dst = new UTF8();
  271. src.readFields(in);
  272. dst.readFields(in);
  273. } else {
  274. ArrayWritable aw = null;
  275. Writable writables[];
  276. aw = new ArrayWritable(UTF8.class);
  277. aw.readFields(in);
  278. writables = aw.get();
  279. if (writables.length != 3) {
  280. throw new IOException("Incorrect data fortmat. "
  281. + "Mkdir operation.");
  282. }
  283. src = (UTF8) writables[0];
  284. dst = (UTF8) writables[1];
  285. timestamp = Long.parseLong(((UTF8)writables[2]).toString());
  286. }
  287. fsDir.unprotectedRenameTo(src.toString(), dst.toString(), timestamp);
  288. break;
  289. }
  290. case OP_DELETE: {
  291. UTF8 src = null;
  292. if (logVersion >= -4) {
  293. src = new UTF8();
  294. src.readFields(in);
  295. } else {
  296. ArrayWritable aw = null;
  297. Writable writables[];
  298. aw = new ArrayWritable(UTF8.class);
  299. aw.readFields(in);
  300. writables = aw.get();
  301. if (writables.length != 2) {
  302. throw new IOException("Incorrect data fortmat. "
  303. + "delete operation.");
  304. }
  305. src = (UTF8) writables[0];
  306. timestamp = Long.parseLong(((UTF8)writables[1]).toString());
  307. }
  308. fsDir.unprotectedDelete(src.toString(), timestamp);
  309. break;
  310. }
  311. case OP_MKDIR: {
  312. UTF8 src = null;
  313. if (logVersion >= -4) {
  314. src = new UTF8();
  315. src.readFields(in);
  316. } else {
  317. ArrayWritable aw = null;
  318. Writable writables[];
  319. aw = new ArrayWritable(UTF8.class);
  320. aw.readFields(in);
  321. writables = aw.get();
  322. if (writables.length != 2) {
  323. throw new IOException("Incorrect data fortmat. "
  324. + "Mkdir operation.");
  325. }
  326. src = (UTF8) writables[0];
  327. timestamp = Long.parseLong(((UTF8)writables[1]).toString());
  328. }
  329. fsDir.unprotectedMkdir(src.toString(), timestamp);
  330. break;
  331. }
  332. case OP_DATANODE_ADD: {
  333. if (logVersion > -3)
  334. throw new IOException("Unexpected opcode " + opcode
  335. + " for version " + logVersion);
  336. FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
  337. nodeimage.readFields(in);
  338. DatanodeDescriptor node = nodeimage.getDatanodeDescriptor();
  339. fsNamesys.unprotectedAddDatanode(node);
  340. break;
  341. }
  342. case OP_DATANODE_REMOVE: {
  343. if (logVersion > -3)
  344. throw new IOException("Unexpected opcode " + opcode
  345. + " for version " + logVersion);
  346. DatanodeID nodeID = new DatanodeID();
  347. nodeID.readFields(in);
  348. DatanodeDescriptor node = fsNamesys.getDatanode(nodeID);
  349. if (node != null) {
  350. fsNamesys.unprotectedRemoveDatanode(node);
  351. // physically remove node from datanodeMap
  352. fsNamesys.wipeDatanode(nodeID);
  353. }
  354. break;
  355. }
  356. default: {
  357. throw new IOException("Never seen opcode " + opcode);
  358. }
  359. }
  360. }
  361. } finally {
  362. in.close();
  363. }
  364. }
  365. if (logVersion != FSConstants.LAYOUT_VERSION) // other version
  366. numEdits++; // save this image asap
  367. return numEdits;
  368. }
  369. static short adjustReplication(short replication) {
  370. FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
  371. short minReplication = fsNamesys.getMinReplication();
  372. if (replication<minReplication) {
  373. replication = minReplication;
  374. }
  375. short maxReplication = fsNamesys.getMaxReplication();
  376. if (replication>maxReplication) {
  377. replication = maxReplication;
  378. }
  379. return replication;
  380. }
  381. /**
  382. * Write an operation to the edit log. Do not sync to persistent
  383. * store yet.
  384. */
  385. synchronized void logEdit(byte op, Writable w1, Writable w2) {
  386. assert this.getNumEditStreams() > 0 : "no editlog streams";
  387. for (int idx = 0; idx < editStreams.size(); idx++) {
  388. EditLogOutputStream eStream;
  389. synchronized (eStream = editStreams.get(idx)) {
  390. try {
  391. eStream.write(op);
  392. if (w1 != null) {
  393. w1.write(eStream);
  394. }
  395. if (w2 != null) {
  396. w2.write(eStream);
  397. }
  398. } catch (IOException ie) {
  399. try {
  400. processIOError(idx);
  401. } catch (IOException e) {
  402. FSNamesystem.LOG.error("Unable to append to edit log. " +
  403. "Fatal Error.");
  404. throw new RuntimeException("Unable to append to edit log. ");
  405. }
  406. }
  407. }
  408. }
  409. //
  410. // record the time when new data was written to the edits log
  411. //
  412. lastModificationTime = System.currentTimeMillis();
  413. }
  414. //
  415. // flush all data of the Edits log into persistent store
  416. //
  417. synchronized void logSync() {
  418. assert this.getNumEditStreams() > 0 : "no editlog streams";
  419. //
  420. // If data was generated before the beginning of the last sync time
  421. // then there is nothing to flush
  422. //
  423. if (lastModificationTime < lastSyncTime) {
  424. return;
  425. }
  426. lastSyncTime = System.currentTimeMillis();
  427. for (int idx = 0; idx < editStreams.size(); idx++) {
  428. EditLogOutputStream eStream;
  429. synchronized (eStream = editStreams.get(idx)) {
  430. try {
  431. eStream.flushAndSync();
  432. } catch (IOException ie) {
  433. try {
  434. processIOError(idx);
  435. } catch (IOException e) {
  436. FSNamesystem.LOG.error("Unable to sync edit log. " +
  437. "Fatal Error.");
  438. throw new RuntimeException("Unable to sync edit log. " +
  439. "Fatal Error.");
  440. }
  441. }
  442. }
  443. }
  444. }
  445. /**
  446. * Add create file record to edit log
  447. */
  448. void logCreateFile(FSDirectory.INode newNode) {
  449. UTF8 nameReplicationPair[] = new UTF8[] {
  450. new UTF8(newNode.computeName()),
  451. FSEditLog.toLogReplication(newNode.getReplication()),
  452. FSEditLog.toLogTimeStamp(newNode.getModificationTime())};
  453. logEdit(OP_ADD,
  454. new ArrayWritable(UTF8.class, nameReplicationPair),
  455. new ArrayWritable(Block.class, newNode.getBlocks()));
  456. }
  457. /**
  458. * Add create directory record to edit log
  459. */
  460. void logMkDir(FSDirectory.INode newNode) {
  461. UTF8 info[] = new UTF8[] {
  462. new UTF8(newNode.computeName()),
  463. FSEditLog.toLogTimeStamp(newNode.getModificationTime())
  464. };
  465. logEdit(OP_MKDIR, new ArrayWritable(UTF8.class, info), null);
  466. }
  467. /**
  468. * Add rename record to edit log
  469. * TODO: use String parameters until just before writing to disk
  470. */
  471. void logRename(String src, String dst, long timestamp) {
  472. UTF8 info[] = new UTF8[] {
  473. new UTF8(src),
  474. new UTF8(dst),
  475. FSEditLog.toLogTimeStamp(timestamp)};
  476. logEdit(OP_RENAME, new ArrayWritable(UTF8.class, info), null);
  477. }
  478. /**
  479. * Add set replication record to edit log
  480. */
  481. void logSetReplication(String src, short replication) {
  482. logEdit(OP_SET_REPLICATION,
  483. new UTF8(src),
  484. FSEditLog.toLogReplication(replication));
  485. }
  486. /**
  487. * Add delete file record to edit log
  488. */
  489. void logDelete(String src, long timestamp) {
  490. UTF8 info[] = new UTF8[] {
  491. new UTF8(src),
  492. FSEditLog.toLogTimeStamp(timestamp)};
  493. logEdit(OP_DELETE, new ArrayWritable(UTF8.class, info), null);
  494. }
  495. /**
  496. * Creates a record in edit log corresponding to a new data node
  497. * registration event.
  498. */
  499. void logAddDatanode(DatanodeDescriptor node) {
  500. logEdit(OP_DATANODE_ADD, new FSImage.DatanodeImage(node), null);
  501. }
  502. /**
  503. * Creates a record in edit log corresponding to a data node
  504. * removal event.
  505. */
  506. void logRemoveDatanode(DatanodeID nodeID) {
  507. logEdit(OP_DATANODE_REMOVE, new DatanodeID(nodeID), null);
  508. }
  509. static UTF8 toLogReplication(short replication) {
  510. return new UTF8(Short.toString(replication));
  511. }
  512. static short fromLogReplication(UTF8 replication) {
  513. return Short.parseShort(replication.toString());
  514. }
  515. static UTF8 toLogTimeStamp(long timestamp) {
  516. return new UTF8(Long.toString(timestamp));
  517. }
  518. /**
  519. * Return the size of the current EditLog
  520. */
  521. synchronized long getEditLogSize() throws IOException {
  522. assert(getNumStorageDirs() == editStreams.size());
  523. long size = 0;
  524. for (int idx = 0; idx < getNumStorageDirs(); idx++) {
  525. synchronized (editStreams.get(idx)) {
  526. assert(size == 0 || size == getEditFile(idx).length());
  527. size = getEditFile(idx).length();
  528. }
  529. }
  530. return size;
  531. }
  532. /**
  533. * Closes the current edit log and opens edits.new.
  534. */
  535. synchronized void rollEditLog() throws IOException {
  536. //
  537. // If edits.new already exists, then return error.
  538. //
  539. if (existsNew()) {
  540. throw new IOException("Attempt to roll edit log but edits.new exists");
  541. }
  542. close(); // close existing edit log
  543. //
  544. // Open edits.new
  545. //
  546. for (int idx = 0; idx < getNumStorageDirs(); idx++) {
  547. try {
  548. EditLogOutputStream eStream = new EditLogOutputStream(getEditNewFile(idx));
  549. eStream.create();
  550. editStreams.add(eStream);
  551. } catch (IOException e) {
  552. processIOError(idx);
  553. idx--;
  554. }
  555. }
  556. }
  557. /**
  558. * Removes the old edit log and renamed edits.new as edits.
  559. * Reopens the edits file.
  560. */
  561. synchronized void purgeEditLog() throws IOException {
  562. //
  563. // If edits.new does not exists, then return error.
  564. //
  565. if (!existsNew()) {
  566. throw new IOException("Attempt to purge edit log " +
  567. "but edits.new does not exist.");
  568. }
  569. close();
  570. //
  571. // Delete edits and rename edits.new to edits.
  572. //
  573. for (int idx = 0; idx < getNumStorageDirs(); idx++) {
  574. if (!getEditNewFile(idx).renameTo(getEditFile(idx))) {
  575. //
  576. // renameTo() fails on Windows if the destination
  577. // file exists.
  578. //
  579. getEditFile(idx).delete();
  580. if (!getEditNewFile(idx).renameTo(getEditFile(idx))) {
  581. processIOError(idx);
  582. idx--;
  583. }
  584. }
  585. }
  586. //
  587. // Reopen all the edits logs.
  588. //
  589. open();
  590. }
  591. /**
  592. * Return the name of the edit file
  593. */
  594. synchronized File getFsEditName() throws IOException {
  595. return getEditFile(0);
  596. }
  597. }