|
@@ -840,17 +840,26 @@ static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
|
|
|
assert(fh->fs != NULL);
|
|
|
assert(fh->hdfsFH != NULL);
|
|
|
|
|
|
+ // special case this as simplifies the rest of the logic to know the caller wanted > 0 bytes
|
|
|
+ if (size == 0)
|
|
|
+ return 0;
|
|
|
+
|
|
|
+ // If size is bigger than the read buffer, then just read right into the user supplied buffer
|
|
|
if (size >= dfs->rdbuffer_size) {
|
|
|
int num_read;
|
|
|
- int total_read = 0;
|
|
|
+ size_t total_read = 0;
|
|
|
while (size - total_read > 0 && (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) {
|
|
|
total_read += num_read;
|
|
|
}
|
|
|
+ // if there was an error before satisfying the current read, this logic declares it an error
|
|
|
+ // and does not try to return any of the bytes read. Don't think it matters, so the code
|
|
|
+ // is just being conservative.
|
|
|
+ if (total_read < size && num_read < 0) {
|
|
|
+ total_read = -EIO;
|
|
|
+ }
|
|
|
return total_read;
|
|
|
}
|
|
|
|
|
|
- assert(fh->bufferSize >= 0);
|
|
|
-
|
|
|
//
|
|
|
// Critical section - protect from multiple reads in different threads accessing the read buffer
|
|
|
// (no returns until end)
|
|
@@ -872,7 +881,7 @@ static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
|
|
|
offset + size > fh->buffersStartOffset + fh->bufferSize)
|
|
|
{
|
|
|
// Read into the buffer from DFS
|
|
|
- size_t num_read;
|
|
|
+ int num_read = 0;
|
|
|
size_t total_read = 0;
|
|
|
|
|
|
while (dfs->rdbuffer_size - total_read > 0 &&
|
|
@@ -880,21 +889,31 @@ static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
|
|
|
total_read += num_read;
|
|
|
}
|
|
|
|
|
|
- if (num_read < 0) {
|
|
|
+ // if there was an error before satisfying the current read, this logic declares it an error
|
|
|
+ // and does not try to return any of the bytes read. Don't think it matters, so the code
|
|
|
+ // is just being conservative.
|
|
|
+ if (total_read < size && num_read < 0) {
|
|
|
// invalidate the buffer
|
|
|
fh->bufferSize = 0;
|
|
|
syslog(LOG_ERR, "Read error - pread failed for %s with return code %d %s:%d", path, (int)num_read, __FILE__, __LINE__);
|
|
|
ret = -EIO;
|
|
|
} else {
|
|
|
+ // Either EOF, all read or read beyond size, but then there was an error
|
|
|
fh->bufferSize = total_read;
|
|
|
fh->buffersStartOffset = offset;
|
|
|
|
|
|
if (dfs->rdbuffer_size - total_read > 0) {
|
|
|
+ // assert(num_read == 0); this should be true since if num_read < 0 handled above.
|
|
|
isEOF = 1;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if (ret == 0) {
|
|
|
+
|
|
|
+ //
|
|
|
+ // NOTE on EOF, fh->bufferSize == 0 and ret = 0 ,so the logic for copying data into the caller's buffer is bypassed, and
|
|
|
+ // the code returns 0 as required
|
|
|
+ //
|
|
|
+ if (ret == 0 && fh->bufferSize > 0) {
|
|
|
|
|
|
assert(offset >= fh->buffersStartOffset);
|
|
|
assert(fh->buf);
|
|
@@ -911,8 +930,6 @@ static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
|
|
|
|
|
|
memcpy(buf, offsetPtr, amount);
|
|
|
|
|
|
- // fuse requires the below and the code should guarantee this assertion
|
|
|
- assert(amount == size || isEOF);
|
|
|
ret = amount;
|
|
|
}
|
|
|
|
|
@@ -921,6 +938,13 @@ static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
|
|
|
//
|
|
|
pthread_mutex_unlock(&fh->mutex);
|
|
|
|
|
|
+ // fuse requires the below and the code should guarantee this assertion
|
|
|
+ // 3 cases on return:
|
|
|
+ // 1. entire read satisfied
|
|
|
+ // 2. partial read and isEOF - including 0 size read
|
|
|
+ // 3. error
|
|
|
+ assert(ret == size || isEOF || ret < 0);
|
|
|
+
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
@@ -1469,8 +1493,9 @@ int dfs_release (const char *path, struct fuse_file_info *fi) {
|
|
|
|
|
|
if (fh->buf != NULL) {
|
|
|
free(fh->buf);
|
|
|
- pthread_mutex_destroy(&fh->mutex);
|
|
|
}
|
|
|
+ // this is always created and initialized, so always destroy it. (see dfs_open)
|
|
|
+ pthread_mutex_destroy(&fh->mutex);
|
|
|
|
|
|
free(fh);
|
|
|
|