|
@@ -1477,6 +1477,24 @@ class DFSClient implements FSConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Cleans up any resources held. Invoked when the stream is closed or
|
|
|
|
+ * in case of an error.
|
|
|
|
+ */
|
|
|
|
+ private synchronized void cleanup() {
|
|
|
|
+ if (!closed) {
|
|
|
|
+ try {
|
|
|
|
+ closeBackupStream();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ // this is not expected. So log is ok.
|
|
|
|
+ LOG.warn("Unexpected error while closing backup stream : " +
|
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
|
+ }
|
|
|
|
+ deleteBackupFile();
|
|
|
|
+ closed = true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private File newBackupFile() throws IOException {
|
|
private File newBackupFile() throws IOException {
|
|
String name = "tmp" + File.separator +
|
|
String name = "tmp" + File.separator +
|
|
"client-" + Math.abs(r.nextLong());
|
|
"client-" + Math.abs(r.nextLong());
|
|
@@ -1582,26 +1600,36 @@ class DFSClient implements FSConstants {
|
|
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
|
|
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
|
|
throws IOException {
|
|
throws IOException {
|
|
checkOpen();
|
|
checkOpen();
|
|
- int bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
|
|
|
- if (len > bytesPerChecksum || (len + bytesWrittenToBlock) > blockSize) {
|
|
|
|
- // should never happen
|
|
|
|
- throw new IOException("Mismatch in writeChunk() args");
|
|
|
|
|
|
+ if (closed) {
|
|
|
|
+ throw new IOException("Stream is closed or can not be written to.");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ int bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
|
|
|
|
|
- if ( backupFile == null ) {
|
|
|
|
- openBackupStream();
|
|
|
|
- }
|
|
|
|
|
|
+ try {
|
|
|
|
+ if (len > bytesPerChecksum || (len + bytesWrittenToBlock) > blockSize) {
|
|
|
|
+ // should never happen
|
|
|
|
+ throw new IOException("Mismatch in writeChunk() args");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ( backupFile == null ) {
|
|
|
|
+ openBackupStream();
|
|
|
|
+ }
|
|
|
|
|
|
- backupStream.write(b, offset, len);
|
|
|
|
- backupStream.write(checksum);
|
|
|
|
|
|
+ backupStream.write(b, offset, len);
|
|
|
|
+ backupStream.write(checksum);
|
|
|
|
|
|
- bytesWrittenToBlock += len;
|
|
|
|
- filePos += len;
|
|
|
|
|
|
+ bytesWrittenToBlock += len;
|
|
|
|
+ filePos += len;
|
|
|
|
|
|
- if ( bytesWrittenToBlock >= blockSize ) {
|
|
|
|
- endBlock();
|
|
|
|
|
|
+ if ( bytesWrittenToBlock >= blockSize ) {
|
|
|
|
+ endBlock();
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ //No more writes can be allowed on this stream.
|
|
|
|
+ cleanup();
|
|
|
|
+ throw e;
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1722,12 +1750,12 @@ class DFSClient implements FSConstants {
|
|
public synchronized void close() throws IOException {
|
|
public synchronized void close() throws IOException {
|
|
checkOpen();
|
|
checkOpen();
|
|
if (closed) {
|
|
if (closed) {
|
|
- throw new IOException("Stream closed");
|
|
|
|
|
|
+ return; // closing multiple times is ok.
|
|
}
|
|
}
|
|
|
|
|
|
- flushBuffer();
|
|
|
|
-
|
|
|
|
try {
|
|
try {
|
|
|
|
+ flushBuffer();
|
|
|
|
+
|
|
if (filePos == 0 || bytesWrittenToBlock != 0) {
|
|
if (filePos == 0 || bytesWrittenToBlock != 0) {
|
|
try {
|
|
try {
|
|
endBlock();
|
|
endBlock();
|
|
@@ -1756,8 +1784,8 @@ class DFSClient implements FSConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- closed = true;
|
|
|
|
} finally {
|
|
} finally {
|
|
|
|
+ cleanup();
|
|
synchronized (pendingCreates) {
|
|
synchronized (pendingCreates) {
|
|
pendingCreates.remove(src.toString());
|
|
pendingCreates.remove(src.toString());
|
|
}
|
|
}
|