|
@@ -88,7 +88,6 @@ static void print_usage(const char *pname)
|
|
|
|
|
|
#define OPTIMIZED_READS 1
|
|
#define OPTIMIZED_READS 1
|
|
|
|
|
|
-
|
|
|
|
enum
|
|
enum
|
|
{
|
|
{
|
|
KEY_VERSION,
|
|
KEY_VERSION,
|
|
@@ -178,7 +177,7 @@ static int dfs_getattr(const char *path, struct stat *st)
|
|
|
|
|
|
// if not connected, try to connect and fail out if we can't.
|
|
// if not connected, try to connect and fail out if we can't.
|
|
if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
|
|
if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
|
|
- syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
|
|
|
|
|
|
+ syslog(LOG_ERR, "ERROR: could not connect to %s:%d %s:%d\n", dfs->nn_hostname, dfs->nn_port,__FILE__, __LINE__);
|
|
return -EIO;
|
|
return -EIO;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -733,12 +732,12 @@ static int dfs_chown(const char *path, uid_t uid, gid_t gid)
|
|
return -ENOTSUP;
|
|
return -ENOTSUP;
|
|
}
|
|
}
|
|
|
|
|
|
-static int dfs_truncate(const char *path, off_t size)
|
|
|
|
-{
|
|
|
|
- (void)path;
|
|
|
|
- (void)size;
|
|
|
|
- return -ENOTSUP;
|
|
|
|
-}
|
|
|
|
|
|
+//static int dfs_truncate(const char *path, off_t size)
|
|
|
|
+//{
|
|
|
|
+// (void)path;
|
|
|
|
+// (void)size;
|
|
|
|
+// return -ENOTSUP;
|
|
|
|
+//}
|
|
|
|
|
|
long tempfh = 0;
|
|
long tempfh = 0;
|
|
|
|
|
|
@@ -746,6 +745,7 @@ static int dfs_open(const char *path, struct fuse_file_info *fi)
|
|
{
|
|
{
|
|
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
|
|
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
|
|
|
|
|
|
|
|
+
|
|
// check params and the context var
|
|
// check params and the context var
|
|
assert(path);
|
|
assert(path);
|
|
assert('/' == *path);
|
|
assert('/' == *path);
|
|
@@ -763,12 +763,14 @@ static int dfs_open(const char *path, struct fuse_file_info *fi)
|
|
// bugbug figure out what this flag is and report problem to Hadoop JIRA
|
|
// bugbug figure out what this flag is and report problem to Hadoop JIRA
|
|
int flags = (fi->flags & 0x7FFF);
|
|
int flags = (fi->flags & 0x7FFF);
|
|
|
|
|
|
|
|
+
|
|
#ifdef OPTIMIZED_READS
|
|
#ifdef OPTIMIZED_READS
|
|
// retrieve dfs specific data
|
|
// retrieve dfs specific data
|
|
dfs_fh *fh = (dfs_fh*)malloc(sizeof (dfs_fh));
|
|
dfs_fh *fh = (dfs_fh*)malloc(sizeof (dfs_fh));
|
|
fi->fh = (uint64_t)fh;
|
|
fi->fh = (uint64_t)fh;
|
|
fh->hdfsFH = (hdfsFile)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
|
|
fh->hdfsFH = (hdfsFile)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
|
|
fh->buf = (char*)malloc(rd_cache_buf_size*sizeof (char));
|
|
fh->buf = (char*)malloc(rd_cache_buf_size*sizeof (char));
|
|
|
|
+
|
|
fh->startOffset = 0;
|
|
fh->startOffset = 0;
|
|
fh->sizeBuffer = 0;
|
|
fh->sizeBuffer = 0;
|
|
|
|
|
|
@@ -777,6 +779,25 @@ static int dfs_open(const char *path, struct fuse_file_info *fi)
|
|
ret = -EIO;
|
|
ret = -EIO;
|
|
}
|
|
}
|
|
#else
|
|
#else
|
|
|
|
+ // fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
|
|
|
|
+
|
|
|
|
+ // bugbug should stop O_RDWR flag here.
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ // bugbug when fix https://issues.apache.org/jira/browse/HADOOP-3723 can remove the below code
|
|
|
|
+ if (flags & O_WRONLY) {
|
|
|
|
+ flags = O_WRONLY;
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (flags & O_RDWR) {
|
|
|
|
+ // NOTE - should not normally be checking policy in the middleman, but the handling of Unix flags in DFS is not
|
|
|
|
+ // consistent right now. 2008-07-16
|
|
|
|
+ syslog(LOG_ERR, "ERROR: trying to open a file with O_RDWR and DFS does not support that %s dfs %s:%d\n", path,__FILE__, __LINE__);
|
|
|
|
+ return -EIO;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
|
|
|
|
|
|
// retrieve dfs specific data
|
|
// retrieve dfs specific data
|
|
fi->fh = (uint64_t)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
|
|
fi->fh = (uint64_t)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
|
|
@@ -821,24 +842,27 @@ static int dfs_write(const char *path, const char *buf, size_t size,
|
|
}
|
|
}
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- // syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
|
|
|
|
-// tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
|
|
|
|
|
|
+ syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
|
|
|
|
+ tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
|
|
|
|
|
|
- // if (cur_offset != offset) {
|
|
|
|
- // syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s %s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
|
|
|
|
-// return -EIO;
|
|
|
|
-// }
|
|
|
|
|
|
+ if (cur_offset != offset) {
|
|
|
|
+ syslog(LOG_ERR, "ERROR: user trying to random access write to a file %d!=%d for %s %s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
|
|
|
|
+ return -EIO;
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
- syslog(LOG_DEBUG,"hdfsWrite(dfs,%ld,'%s',%d)\n",(long)file_handle,buf,(int)size);
|
|
|
|
tSize length = hdfsWrite(dfs->fs, file_handle, buf, size);
|
|
tSize length = hdfsWrite(dfs->fs, file_handle, buf, size);
|
|
|
|
|
|
-
|
|
|
|
- if (length != size) {
|
|
|
|
|
|
+ if(length <= 0) {
|
|
syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
|
|
syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
|
|
return -EIO;
|
|
return -EIO;
|
|
}
|
|
}
|
|
- return 0;
|
|
|
|
|
|
+
|
|
|
|
+ if (length != size) {
|
|
|
|
+ syslog(LOG_ERR, "WARN: fuse problem - could not write all the bytes for %s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return length;
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@@ -866,7 +890,6 @@ int dfs_release (const char *path, struct fuse_file_info *fi) {
|
|
hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
|
|
hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
|
|
free(fh->buf);
|
|
free(fh->buf);
|
|
free(fh);
|
|
free(fh);
|
|
-
|
|
|
|
#else
|
|
#else
|
|
hdfsFile file_handle = (hdfsFile)fi->fh;
|
|
hdfsFile file_handle = (hdfsFile)fi->fh;
|
|
#endif
|
|
#endif
|
|
@@ -876,7 +899,8 @@ int dfs_release (const char *path, struct fuse_file_info *fi) {
|
|
}
|
|
}
|
|
|
|
|
|
if (hdfsCloseFile(dfs->fs, file_handle) != 0) {
|
|
if (hdfsCloseFile(dfs->fs, file_handle) != 0) {
|
|
- syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle for %s %s:%d\n",path, __FILE__, __LINE__);
|
|
|
|
|
|
+ syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
|
|
|
|
+ // fprintf(stderr, "ERROR: dfs problem - could not close file_handle(%ld) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
|
|
return -EIO;
|
|
return -EIO;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -891,12 +915,47 @@ static int dfs_mknod(const char *path, mode_t mode, dev_t rdev) {
|
|
|
|
|
|
static int dfs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
|
|
static int dfs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
|
|
{
|
|
{
|
|
- syslog(LOG_DEBUG,"in dfs_create");
|
|
|
|
fi->flags |= mode;
|
|
fi->flags |= mode;
|
|
-
|
|
|
|
return dfs_open(path, fi);
|
|
return dfs_open(path, fi);
|
|
}
|
|
}
|
|
|
|
+
|
|
int dfs_flush(const char *path, struct fuse_file_info *fi) {
|
|
int dfs_flush(const char *path, struct fuse_file_info *fi) {
|
|
|
|
+
|
|
|
|
+ // retrieve dfs specific data
|
|
|
|
+ dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
|
|
|
|
+
|
|
|
|
+ // check params and the context var
|
|
|
|
+ assert(path);
|
|
|
|
+ assert(dfs);
|
|
|
|
+ assert('/' == *path);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ // if not connected, try to connect and fail out if we can't.
|
|
|
|
+ if (NULL == dfs->fs && NULL == (dfs->fs = hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
|
|
|
|
+ syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__, __LINE__);
|
|
|
|
+ return -EIO;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (NULL == (void*)fi->fh) {
|
|
|
|
+ return 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // note that fuse calls flush on RO files too and hdfs does not like that and will return an error
|
|
|
|
+ if(fi->flags & O_WRONLY) {
|
|
|
|
+
|
|
|
|
+#ifdef OPTIMIZED_READS
|
|
|
|
+ dfs_fh *fh = (dfs_fh*)fi->fh;
|
|
|
|
+ hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
|
|
|
|
+#else
|
|
|
|
+ hdfsFile file_handle = (hdfsFile)fi->fh;
|
|
|
|
+#endif
|
|
|
|
+
|
|
|
|
+ if (hdfsFlush(dfs->fs, file_handle) != 0) {
|
|
|
|
+ syslog(LOG_ERR, "ERROR: dfs problem - could not flush file_handle(%x) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
|
|
|
|
+ return -EIO;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1024,11 +1083,11 @@ static struct fuse_operations dfs_oper = {
|
|
.rename = dfs_rename,
|
|
.rename = dfs_rename,
|
|
.unlink = dfs_unlink,
|
|
.unlink = dfs_unlink,
|
|
.release = dfs_release,
|
|
.release = dfs_release,
|
|
- // .create = dfs_create,
|
|
|
|
- // .write = dfs_write,
|
|
|
|
- // .flush = dfs_flush,
|
|
|
|
|
|
+ .create = dfs_create,
|
|
|
|
+ .write = dfs_write,
|
|
|
|
+ .flush = dfs_flush,
|
|
//.xsetattr = dfs_setattr,
|
|
//.xsetattr = dfs_setattr,
|
|
- // .mknod = dfs_mknod,
|
|
|
|
|
|
+ .mknod = dfs_mknod,
|
|
.chmod = dfs_chmod,
|
|
.chmod = dfs_chmod,
|
|
.chown = dfs_chown,
|
|
.chown = dfs_chown,
|
|
// .truncate = dfs_truncate,
|
|
// .truncate = dfs_truncate,
|
|
@@ -1037,6 +1096,7 @@ static struct fuse_operations dfs_oper = {
|
|
|
|
|
|
int main(int argc, char *argv[])
|
|
int main(int argc, char *argv[])
|
|
{
|
|
{
|
|
|
|
+
|
|
umask(0);
|
|
umask(0);
|
|
|
|
|
|
program = argv[0];
|
|
program = argv[0];
|
|
@@ -1049,10 +1109,13 @@ int main(int argc, char *argv[])
|
|
/** error parsing options */
|
|
/** error parsing options */
|
|
return -1;
|
|
return -1;
|
|
|
|
|
|
|
|
+
|
|
if (options.server == NULL || options.port == 0) {
|
|
if (options.server == NULL || options.port == 0) {
|
|
print_usage(argv[0]);
|
|
print_usage(argv[0]);
|
|
exit(0);
|
|
exit(0);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
|
|
int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
|
|
|
|
|
|
if (ret) printf("\n");
|
|
if (ret) printf("\n");
|