|
@@ -90,34 +90,61 @@ public class FSDataInputStream extends DataInputStream
|
|
|
}
|
|
|
|
|
|
public int read(byte b[], int off, int len) throws IOException {
|
|
|
- int read = in.read(b, off, len);
|
|
|
+ int read;
|
|
|
+ boolean retry;
|
|
|
+ int retriesLeft = 3;
|
|
|
+ long oldPos = getPos();
|
|
|
+ do {
|
|
|
+ retriesLeft--;
|
|
|
+ retry = false;
|
|
|
|
|
|
- if (sums != null) {
|
|
|
- int summed = 0;
|
|
|
- while (summed < read) {
|
|
|
-
|
|
|
- int goal = bytesPerSum - inSum;
|
|
|
- int inBuf = read - summed;
|
|
|
- int toSum = inBuf <= goal ? inBuf : goal;
|
|
|
-
|
|
|
+ read = in.read(b, off, len);
|
|
|
+
|
|
|
+ if (sums != null) {
|
|
|
+ long oldSumsPos = sums.getPos();
|
|
|
try {
|
|
|
- sum.update(b, off+summed, toSum);
|
|
|
- } catch (ArrayIndexOutOfBoundsException e) {
|
|
|
- throw new RuntimeException("Summer buffer overflow b.len=" +
|
|
|
- b.length + ", off=" + off +
|
|
|
- ", summed=" + summed + ", read=" +
|
|
|
- read + ", bytesPerSum=" + bytesPerSum +
|
|
|
- ", inSum=" + inSum, e);
|
|
|
- }
|
|
|
- summed += toSum;
|
|
|
+ int summed = 0;
|
|
|
+ while (summed < read) {
|
|
|
+ int goal = bytesPerSum - inSum;
|
|
|
+ int inBuf = read - summed;
|
|
|
+ int toSum = inBuf <= goal ? inBuf : goal;
|
|
|
|
|
|
- inSum += toSum;
|
|
|
- if (inSum == bytesPerSum) {
|
|
|
- verifySum(read-(summed-bytesPerSum));
|
|
|
+ try {
|
|
|
+ sum.update(b, off+summed, toSum);
|
|
|
+ } catch (ArrayIndexOutOfBoundsException e) {
|
|
|
+ throw new RuntimeException("Summer buffer overflow b.len=" +
|
|
|
+ b.length + ", off=" + off +
|
|
|
+ ", summed=" + summed + ", read=" +
|
|
|
+ read + ", bytesPerSum=" + bytesPerSum +
|
|
|
+ ", inSum=" + inSum, e);
|
|
|
+ }
|
|
|
+ summed += toSum;
|
|
|
+
|
|
|
+ inSum += toSum;
|
|
|
+ if (inSum == bytesPerSum) {
|
|
|
+ verifySum(read-(summed-bytesPerSum));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (ChecksumException ce) {
|
|
|
+ LOG.info("Found checksum error: " + StringUtils.stringifyException(ce));
|
|
|
+ if (retriesLeft == 0) {
|
|
|
+ throw ce;
|
|
|
+ }
|
|
|
+ sums.seek(oldSumsPos);
|
|
|
+ if (!((FSInputStream)in).seekToNewSource(oldPos) ||
|
|
|
+ !((FSInputStream)sumsIn).seekToNewSource(oldSumsPos)) {
|
|
|
+ // Neither the data stream nor the checksum stream are being read from
|
|
|
+ // different sources, meaning we'll still get a checksum error if we
|
|
|
+ // try to do the read again. We throw an exception instead.
|
|
|
+ throw ce;
|
|
|
+ } else {
|
|
|
+ // Since at least one of the sources is different, the read might succeed,
|
|
|
+ // so we'll retry.
|
|
|
+ retry = true;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
+ } while (retry);
|
|
|
return read;
|
|
|
}
|
|
|
|
|
@@ -270,7 +297,11 @@ public class FSDataInputStream extends DataInputStream
|
|
|
public FSDataInputStream(FileSystem fs, Path file, int bufferSize, Configuration conf)
|
|
|
throws IOException {
|
|
|
super(null);
|
|
|
- this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize);
|
|
|
+ Checker chkr = new Checker(fs, file, conf); // sets bytesPerSum
|
|
|
+ if (bufferSize % bytesPerSum != 0) {
|
|
|
+ throw new IOException("Buffer size must be multiple of " + bytesPerSum);
|
|
|
+ }
|
|
|
+ this.in = new Buffer(new PositionCache(chkr), bufferSize);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -278,7 +309,11 @@ public class FSDataInputStream extends DataInputStream
|
|
|
throws IOException {
|
|
|
super(null);
|
|
|
int bufferSize = conf.getInt("io.file.buffer.size", 4096);
|
|
|
- this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize);
|
|
|
+ Checker chkr = new Checker(fs, file, conf);
|
|
|
+ if (bufferSize % bytesPerSum != 0) {
|
|
|
+ throw new IOException("Buffer size must be multiple of " + bytesPerSum);
|
|
|
+ }
|
|
|
+ this.in = new Buffer(new PositionCache(chkr), bufferSize);
|
|
|
}
|
|
|
|
|
|
/** Construct without checksums. */
|