|
@@ -28,8 +28,11 @@ import java.util.zip.Checksum;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
/**
|
|
|
* An implementation of the abstract class {@link EditLogOutputStream}, which
|
|
|
* stores edits in a local file.
|
|
@@ -120,32 +123,41 @@ class EditLogFileOutputStream extends EditLogOutputStream {
|
|
|
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
- // close should have been called after all pending transactions
|
|
|
- // have been flushed & synced.
|
|
|
- // if already closed, just skip
|
|
|
- if(bufCurrent != null)
|
|
|
- {
|
|
|
- int bufSize = bufCurrent.size();
|
|
|
- if (bufSize != 0) {
|
|
|
- throw new IOException("FSEditStream has " + bufSize
|
|
|
- + " bytes still to be flushed and cannot " + "be closed.");
|
|
|
+ try {
|
|
|
+ // close should have been called after all pending transactions
|
|
|
+ // have been flushed & synced.
|
|
|
+ // if already closed, just skip
|
|
|
+ if(bufCurrent != null)
|
|
|
+ {
|
|
|
+ int bufSize = bufCurrent.size();
|
|
|
+ if (bufSize != 0) {
|
|
|
+ throw new IOException("FSEditStream has " + bufSize
|
|
|
+ + " bytes still to be flushed and cannot " + "be closed.");
|
|
|
+ }
|
|
|
+ bufCurrent.close();
|
|
|
+ bufCurrent = null;
|
|
|
}
|
|
|
- bufCurrent.close();
|
|
|
- bufCurrent = null;
|
|
|
- }
|
|
|
-
|
|
|
- if(bufReady != null) {
|
|
|
- bufReady.close();
|
|
|
- bufReady = null;
|
|
|
- }
|
|
|
-
|
|
|
- // remove the last INVALID marker from transaction log.
|
|
|
- if (fc != null && fc.isOpen()) {
|
|
|
- fc.truncate(fc.position());
|
|
|
- fc.close();
|
|
|
- }
|
|
|
- if (fp != null) {
|
|
|
- fp.close();
|
|
|
+
|
|
|
+ if(bufReady != null) {
|
|
|
+ bufReady.close();
|
|
|
+ bufReady = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // remove the last INVALID marker from transaction log.
|
|
|
+ if (fc != null && fc.isOpen()) {
|
|
|
+ fc.truncate(fc.position());
|
|
|
+ fc.close();
|
|
|
+ fc = null;
|
|
|
+ }
|
|
|
+ if (fp != null) {
|
|
|
+ fp.close();
|
|
|
+ fp = null;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
|
|
|
+ bufCurrent = bufReady = null;
|
|
|
+ fc = null;
|
|
|
+ fp = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -225,4 +237,14 @@ class EditLogFileOutputStream extends EditLogOutputStream {
|
|
|
File getFile() {
|
|
|
return file;
|
|
|
}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public void setFileChannelForTesting(FileChannel fc) {
|
|
|
+ this.fc = fc;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public FileChannel getFileChannelForTesting() {
|
|
|
+ return fc;
|
|
|
+ }
|
|
|
}
|