12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <stdio.h>
- #include <string.h>
- #include <stdlib.h>
- #include "libhdfs/exception.h"
- #include "hdfs/hdfs.h"
- #include "hdfs_http_client.h"
- #include "hdfs_http_query.h"
- #include "hdfs_json_parser.h"
- #include "jni_helper.h"
- #define HADOOP_HDFS_CONF "org/apache/hadoop/hdfs/HdfsConfiguration"
- #define HADOOP_NAMENODE "org/apache/hadoop/hdfs/server/namenode/NameNode"
- #define JAVA_INETSOCKETADDRESS "java/net/InetSocketAddress"
- struct hdfsBuilder {
- int forceNewInstance;
- const char *nn;
- tPort port;
- const char *kerbTicketCachePath;
- const char *userName;
- };
- /**
- * The information required for accessing webhdfs,
- * including the network address of the namenode and the user name
- *
- * Unlike the string in hdfsBuilder, the strings in this structure are
- * dynamically allocated. This structure will not be freed until we disconnect
- * from HDFS.
- */
- struct hdfs_internal {
- char *nn;
- tPort port;
- char *userName;
- /**
- * Working directory -- stored with a trailing slash.
- */
- char *workingDir;
- };
- /**
- * The 'file-handle' to a file in hdfs.
- */
- struct hdfsFile_internal {
- struct webhdfsFileHandle* file;
- enum hdfsStreamType type; /* INPUT or OUTPUT */
- int flags; /* Flag indicate read/create/append etc. */
- tOffset offset; /* Current offset position in the file */
- };
- /**
- * Create, initialize and return a webhdfsBuffer
- */
- static int initWebHdfsBuffer(struct webhdfsBuffer **webhdfsBuffer)
- {
- int ret = 0;
- struct webhdfsBuffer *buffer = calloc(1, sizeof(struct webhdfsBuffer));
- if (!buffer) {
- fprintf(stderr,
- "ERROR: fail to allocate memory for webhdfsBuffer.\n");
- return ENOMEM;
- }
- ret = pthread_mutex_init(&buffer->writeMutex, NULL);
- if (ret) {
- fprintf(stderr, "ERROR: fail in pthread_mutex_init for writeMutex "
- "in initWebHdfsBuffer, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
- ret = pthread_cond_init(&buffer->newwrite_or_close, NULL);
- if (ret) {
- fprintf(stderr,
- "ERROR: fail in pthread_cond_init for newwrite_or_close "
- "in initWebHdfsBuffer, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
- ret = pthread_cond_init(&buffer->transfer_finish, NULL);
- if (ret) {
- fprintf(stderr,
- "ERROR: fail in pthread_cond_init for transfer_finish "
- "in initWebHdfsBuffer, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
-
- done:
- if (ret) {
- free(buffer);
- return ret;
- }
- *webhdfsBuffer = buffer;
- return 0;
- }
- /**
- * Reset the webhdfsBuffer. This is used in a block way
- * when hdfsWrite is called with a new buffer to write.
- * The writing thread in libcurl will be waken up to continue writing,
- * and the caller of this function is blocked waiting for writing to finish.
- *
- * @param wb The handle of the webhdfsBuffer
- * @param buffer The buffer provided by user to write
- * @param length The length of bytes to write
- * @return Updated webhdfsBuffer.
- */
- static struct webhdfsBuffer *resetWebhdfsBuffer(struct webhdfsBuffer *wb,
- const char *buffer, size_t length)
- {
- if (buffer && length > 0) {
- pthread_mutex_lock(&wb->writeMutex);
- wb->wbuffer = buffer;
- wb->offset = 0;
- wb->remaining = length;
- pthread_cond_signal(&wb->newwrite_or_close);
- while (wb->remaining != 0) {
- pthread_cond_wait(&wb->transfer_finish, &wb->writeMutex);
- }
- pthread_mutex_unlock(&wb->writeMutex);
- }
- return wb;
- }
- /**
- * Free the webhdfsBuffer and destroy its pthread conditions/mutex
- * @param buffer The webhdfsBuffer to free
- */
- static void freeWebhdfsBuffer(struct webhdfsBuffer *buffer)
- {
- int ret = 0;
- if (buffer) {
- ret = pthread_cond_destroy(&buffer->newwrite_or_close);
- if (ret) {
- fprintf(stderr,
- "WARN: fail in pthread_cond_destroy for newwrite_or_close "
- "in freeWebhdfsBuffer, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- errno = ret;
- }
- ret = pthread_cond_destroy(&buffer->transfer_finish);
- if (ret) {
- fprintf(stderr,
- "WARN: fail in pthread_cond_destroy for transfer_finish "
- "in freeWebhdfsBuffer, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- errno = ret;
- }
- ret = pthread_mutex_destroy(&buffer->writeMutex);
- if (ret) {
- fprintf(stderr,
- "WARN: fail in pthread_mutex_destroy for writeMutex "
- "in freeWebhdfsBuffer, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- errno = ret;
- }
- free(buffer);
- buffer = NULL;
- }
- }
- /**
- * To free the webhdfsFileHandle, which includes a webhdfsBuffer and strings
- * @param handle The webhdfsFileHandle to free
- */
- static void freeWebFileHandle(struct webhdfsFileHandle * handle)
- {
- if (!handle)
- return;
- freeWebhdfsBuffer(handle->uploadBuffer);
- free(handle->datanode);
- free(handle->absPath);
- free(handle);
- }
- static const char *maybeNull(const char *str)
- {
- return str ? str : "(NULL)";
- }
- /** To print a hdfsBuilder as string */
- static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
- char *buf, size_t bufLen)
- {
- int strlength = snprintf(buf, bufLen, "nn=%s, port=%d, "
- "kerbTicketCachePath=%s, userName=%s",
- maybeNull(bld->nn), bld->port,
- maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
- if (strlength < 0 || strlength >= bufLen) {
- fprintf(stderr, "failed to print a hdfsBuilder as string.\n");
- return NULL;
- }
- return buf;
- }
- /**
- * Free a hdfs_internal handle
- * @param fs The hdfs_internal handle to free
- */
- static void freeWebHdfsInternal(struct hdfs_internal *fs)
- {
- if (fs) {
- free(fs->nn);
- free(fs->userName);
- free(fs->workingDir);
- }
- }
- struct hdfsBuilder *hdfsNewBuilder(void)
- {
- struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
- if (!bld) {
- errno = ENOMEM;
- return NULL;
- }
- return bld;
- }
- void hdfsFreeBuilder(struct hdfsBuilder *bld)
- {
- free(bld);
- }
- void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
- {
- // We don't cache instances in libwebhdfs, so this is not applicable.
- }
- void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
- {
- if (bld) {
- bld->nn = nn;
- }
- }
- void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
- {
- if (bld) {
- bld->port = port;
- }
- }
- void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
- {
- if (bld) {
- bld->userName = userName;
- }
- }
- void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
- const char *kerbTicketCachePath)
- {
- if (bld) {
- bld->kerbTicketCachePath = kerbTicketCachePath;
- }
- }
- hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user)
- {
- struct hdfsBuilder* bld = hdfsNewBuilder();
- if (!bld) {
- return NULL;
- }
- hdfsBuilderSetNameNode(bld, nn);
- hdfsBuilderSetNameNodePort(bld, port);
- hdfsBuilderSetUserName(bld, user);
- return hdfsBuilderConnect(bld);
- }
- hdfsFS hdfsConnect(const char* nn, tPort port)
- {
- return hdfsConnectAsUser(nn, port, NULL);
- }
- hdfsFS hdfsConnectNewInstance(const char* nn, tPort port)
- {
- return hdfsConnect(nn, port);
- }
- hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
- const char *user)
- {
- struct hdfsBuilder *bld = hdfsNewBuilder();
- if (!bld)
- return NULL;
- hdfsBuilderSetNameNode(bld, host);
- hdfsBuilderSetNameNodePort(bld, port);
- hdfsBuilderSetUserName(bld, user);
- hdfsBuilderSetForceNewInstance(bld);
- return hdfsBuilderConnect(bld);
- }
- /**
- * To retrieve the default configuration value for NameNode's hostName and port
- * TODO: This function currently is using JNI,
- * we need to do this without using JNI (HDFS-3917)
- *
- * @param bld The hdfsBuilder handle
- * @param port Used to get the default value for NameNode's port
- * @param nn Used to get the default value for NameNode's hostName
- * @return 0 for success and non-zero value for failure
- */
- static int retrieveDefaults(const struct hdfsBuilder *bld, tPort *port,
- char **nn)
- {
- JNIEnv *env = 0;
- jobject jHDFSConf = NULL, jAddress = NULL;
- jstring jHostName = NULL;
- jvalue jVal;
- jthrowable jthr = NULL;
- int ret = 0;
- char buf[512];
-
- env = getJNIEnv();
- if (!env) {
- return EINTERNAL;
- }
-
- jthr = constructNewObjectOfClass(env, &jHDFSConf, HADOOP_HDFS_CONF, "()V");
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsBuilderConnect(%s)",
- hdfsBuilderToStr(bld, buf, sizeof(buf)));
- goto done;
- }
-
- jthr = invokeMethod(env, &jVal, STATIC, NULL,
- HADOOP_NAMENODE, "getHttpAddress",
- "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/InetSocketAddress;",
- jHDFSConf);
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsBuilderConnect(%s)",
- hdfsBuilderToStr(bld, buf, sizeof(buf)));
- goto done;
- }
- jAddress = jVal.l;
-
- jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
- JAVA_INETSOCKETADDRESS, "getPort", "()I");
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsBuilderConnect(%s)",
- hdfsBuilderToStr(bld, buf, sizeof(buf)));
- goto done;
- }
- *port = jVal.i;
-
- jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
- JAVA_INETSOCKETADDRESS,
- "getHostName", "()Ljava/lang/String;");
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsBuilderConnect(%s)",
- hdfsBuilderToStr(bld, buf, sizeof(buf)));
- goto done;
- }
- jHostName = jVal.l;
- jthr = newCStr(env, jHostName, nn);
- if (jthr) {
- ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
- "hdfsBuilderConnect(%s)",
- hdfsBuilderToStr(bld, buf, sizeof(buf)));
- goto done;
- }
- done:
- destroyLocalReference(env, jHDFSConf);
- destroyLocalReference(env, jAddress);
- destroyLocalReference(env, jHostName);
- return ret;
- }
- hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
- {
- struct hdfs_internal *fs = NULL;
- int ret = 0;
- if (!bld) {
- ret = EINVAL;
- goto done;
- }
- if (bld->nn == NULL) {
- // In the JNI version of libhdfs this returns a LocalFileSystem.
- ret = ENOTSUP;
- goto done;
- }
-
- fs = calloc(1, sizeof(*fs));
- if (!fs) {
- ret = ENOMEM;
- goto done;
- }
- // If the namenode is "default" and/or the port of namenode is 0,
- // get the default namenode/port
- if (bld->port == 0 || !strcasecmp("default", bld->nn)) {
- ret = retrieveDefaults(bld, &fs->port, &fs->nn);
- if (ret)
- goto done;
- } else {
- fs->port = bld->port;
- fs->nn = strdup(bld->nn);
- if (!fs->nn) {
- ret = ENOMEM;
- goto done;
- }
- }
- if (bld->userName) {
- // userName may be NULL
- fs->userName = strdup(bld->userName);
- if (!fs->userName) {
- ret = ENOMEM;
- goto done;
- }
- }
- // The working directory starts out as root.
- fs->workingDir = strdup("/");
- if (!fs->workingDir) {
- ret = ENOMEM;
- goto done;
- }
- // For debug
- fprintf(stderr, "namenode: %s:%d\n", bld->nn, bld->port);
- done:
- free(bld);
- if (ret) {
- freeWebHdfsInternal(fs);
- errno = ret;
- return NULL;
- }
- return fs;
- }
- int hdfsDisconnect(hdfsFS fs)
- {
- if (fs == NULL) {
- errno = EINVAL;
- return -1;
- }
- freeWebHdfsInternal(fs);
- return 0;
- }
- /**
- * Based on the working directory stored in hdfsFS,
- * generate the absolute path for the given path
- *
- * @param fs The hdfsFS handle which stores the current working directory
- * @param path The given path which may not be an absolute path
- * @param absPath To hold generated absolute path for the given path
- * @return 0 on success, non-zero value indicating error
- */
- static int getAbsolutePath(hdfsFS fs, const char *path, char **absPath)
- {
- char *tempPath = NULL;
- size_t absPathLen;
- int strlength;
-
- if (path[0] == '/') {
- // Path is already absolute.
- tempPath = strdup(path);
- if (!tempPath) {
- return ENOMEM;
- }
- *absPath = tempPath;
- return 0;
- }
- // Prepend the workingDir to the path.
- absPathLen = strlen(fs->workingDir) + strlen(path) + 1;
- tempPath = malloc(absPathLen);
- if (!tempPath) {
- return ENOMEM;
- }
- strlength = snprintf(tempPath, absPathLen, "%s%s", fs->workingDir, path);
- if (strlength < 0 || strlength >= absPathLen) {
- free(tempPath);
- return EIO;
- }
- *absPath = tempPath;
- return 0;
- }
- int hdfsCreateDirectory(hdfsFS fs, const char* path)
- {
- char *url = NULL, *absPath = NULL;
- struct Response *resp = NULL;
- int ret = 0;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
- ret = createUrlForMKDIR(fs->nn, fs->port, absPath, fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchMKDIR(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseMKDIR(resp->body->content);
- done:
- freeResponse(resp);
- free(url);
- free(absPath);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- int hdfsChmod(hdfsFS fs, const char* path, short mode)
- {
- char *absPath = NULL, *url = NULL;
- struct Response *resp = NULL;
- int ret = 0;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
- ret = createUrlForCHMOD(fs->nn, fs->port, absPath, (int) mode,
- fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchCHMOD(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseCHMOD(resp->header->content, resp->body->content);
- done:
- freeResponse(resp);
- free(absPath);
- free(url);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
- {
- int ret = 0;
- char *absPath = NULL, *url = NULL;
- struct Response *resp = NULL;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
-
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
- ret = createUrlForCHOWN(fs->nn, fs->port, absPath,
- owner, group, fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchCHOWN(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseCHOWN(resp->header->content, resp->body->content);
- done:
- freeResponse(resp);
- free(absPath);
- free(url);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
- {
- char *oldAbsPath = NULL, *newAbsPath = NULL, *url = NULL;
- int ret = 0;
- struct Response *resp = NULL;
- if (fs == NULL || oldPath == NULL || newPath == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, oldPath, &oldAbsPath);
- if (ret) {
- goto done;
- }
- ret = getAbsolutePath(fs, newPath, &newAbsPath);
- if (ret) {
- goto done;
- }
- ret = createUrlForRENAME(fs->nn, fs->port, oldAbsPath,
- newAbsPath, fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchRENAME(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseRENAME(resp->body->content);
- done:
- freeResponse(resp);
- free(oldAbsPath);
- free(newAbsPath);
- free(url);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- /**
- * Get the file status for a given path.
- *
- * @param fs hdfsFS handle containing
- * NameNode hostName/port information
- * @param path Path for file
- * @param printError Whether or not to print out error information
- * (mainly remote FileNotFoundException)
- * @return File information for the given path
- */
- static hdfsFileInfo *hdfsGetPathInfoImpl(hdfsFS fs, const char* path,
- int printError)
- {
- char *absPath = NULL;
- char *url=NULL;
- struct Response *resp = NULL;
- int ret = 0;
- hdfsFileInfo *fileInfo = NULL;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
- fileInfo = (hdfsFileInfo *) calloc(1, sizeof(hdfsFileInfo));
- if (!fileInfo) {
- ret = ENOMEM;
- goto done;
- }
- fileInfo->mKind = kObjectKindFile;
- ret = createUrlForGetFileStatus(fs->nn, fs->port, absPath,
- fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchGFS(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseGFS(resp->body->content, fileInfo, printError);
-
- done:
- freeResponse(resp);
- free(absPath);
- free(url);
- if (ret == 0) {
- return fileInfo;
- } else {
- free(fileInfo);
- errno = ret;
- return NULL;
- }
- }
- hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
- {
- return hdfsGetPathInfoImpl(fs, path, 1);
- }
- hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
- {
- char *url = NULL, *absPath = NULL;
- struct Response *resp = NULL;
- int ret = 0;
- hdfsFileInfo *fileInfo = NULL;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
- fileInfo = calloc(1, sizeof(*fileInfo));
- if (!fileInfo) {
- ret = ENOMEM;
- goto done;
- }
-
- ret = createUrlForLS(fs->nn, fs->port, absPath, fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchLS(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseLS(resp->body->content, &fileInfo, numEntries);
-
- done:
- freeResponse(resp);
- free(absPath);
- free(url);
- if (ret == 0) {
- return fileInfo;
- } else {
- errno = ret;
- return NULL;
- }
- }
- int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
- {
- char *url = NULL, *absPath = NULL;
- struct Response *resp = NULL;
- int ret = 0;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
- ret = createUrlForSETREPLICATION(fs->nn, fs->port, absPath,
- replication, fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchSETREPLICATION(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseSETREPLICATION(resp->body->content);
- done:
- freeResponse(resp);
- free(absPath);
- free(url);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
- {
- int i;
- for (i = 0; i < numEntries; ++i) {
- free(hdfsFileInfo[i].mName);
- free(hdfsFileInfo[i].mOwner);
- free(hdfsFileInfo[i].mGroup);
- }
- free(hdfsFileInfo);
- }
- int hdfsDelete(hdfsFS fs, const char* path, int recursive)
- {
- char *url = NULL, *absPath = NULL;
- struct Response *resp = NULL;
- int ret = 0;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
-
- ret = createUrlForDELETE(fs->nn, fs->port, absPath,
- recursive, fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchDELETE(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseDELETE(resp->body->content);
- done:
- freeResponse(resp);
- free(absPath);
- free(url);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
- {
- char *url = NULL, *absPath = NULL;
- struct Response *resp = NULL;
- int ret = 0;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- ret = getAbsolutePath(fs, path, &absPath);
- if (ret) {
- goto done;
- }
-
- ret = createUrlForUTIMES(fs->nn, fs->port, absPath, mtime, atime,
- fs->userName, &url);
- if (ret) {
- goto done;
- }
- ret = launchUTIMES(url, &resp);
- if (ret) {
- goto done;
- }
- ret = parseUTIMES(resp->header->content, resp->body->content);
- done:
- freeResponse(resp);
- free(absPath);
- free(url);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- int hdfsExists(hdfsFS fs, const char *path)
- {
- hdfsFileInfo *fileInfo = hdfsGetPathInfoImpl(fs, path, 0);
- if (!fileInfo) {
- // (errno will have been set by hdfsGetPathInfo)
- return -1;
- }
- hdfsFreeFileInfo(fileInfo, 1);
- return 0;
- }
- /**
- * The information hold by the thread which writes data to hdfs through http
- */
- typedef struct {
- char *url; /* the url of the target datanode for writing*/
- struct webhdfsBuffer *uploadBuffer; /* buffer storing data to write */
- int flags; /* flag indicating writing mode: create or append */
- struct Response *resp; /* response from the target datanode */
- } threadData;
- /**
- * Free the threadData struct instance,
- * including the response and url contained in it
- * @param data The threadData instance to free
- */
- static void freeThreadData(threadData *data)
- {
- if (data) {
- if (data->url) {
- free(data->url);
- }
- if (data->resp) {
- freeResponse(data->resp);
- }
- // The uploadBuffer would be freed by freeWebFileHandle()
- free(data);
- data = NULL;
- }
- }
- /**
- * The action of the thread that writes data to
- * the target datanode for hdfsWrite.
- * The writing can be either create or append, which is specified by flag
- */
- static void *writeThreadOperation(void *v)
- {
- int ret = 0;
- threadData *data = v;
- if (data->flags & O_APPEND) {
- ret = launchDnAPPEND(data->url, data->uploadBuffer, &(data->resp));
- } else {
- ret = launchDnWRITE(data->url, data->uploadBuffer, &(data->resp));
- }
- if (ret) {
- fprintf(stderr, "Failed to write to datanode %s, <%d>: %s.\n",
- data->url, ret, hdfs_strerror(ret));
- }
- return data;
- }
- /**
- * Free the memory associated with a webHDFS file handle.
- *
- * No other resources will be freed.
- *
- * @param file The webhdfs file handle
- */
- static void freeFileInternal(hdfsFile file)
- {
- if (!file)
- return;
- freeWebFileHandle(file->file);
- free(file);
- }
- /**
- * Helper function for opening a file for OUTPUT.
- *
- * As part of the open process for OUTPUT files, we have to connect to the
- * NameNode and get the URL of the corresponding DataNode.
- * We also create a background thread here for doing I/O.
- *
- * @param webhandle The webhandle being opened
- * @return 0 on success; error code otherwise
- */
- static int hdfsOpenOutputFileImpl(hdfsFS fs, hdfsFile file)
- {
- struct webhdfsFileHandle *webhandle = file->file;
- struct Response *resp = NULL;
- int append, ret = 0;
- char *nnUrl = NULL, *dnUrl = NULL;
- threadData *data = NULL;
- ret = initWebHdfsBuffer(&webhandle->uploadBuffer);
- if (ret) {
- goto done;
- }
- append = file->flags & O_APPEND;
- if (!append) {
- // If we're not appending, send a create request to the NN
- ret = createUrlForNnWRITE(fs->nn, fs->port, webhandle->absPath,
- fs->userName, webhandle->replication,
- webhandle->blockSize, &nnUrl);
- } else {
- ret = createUrlForNnAPPEND(fs->nn, fs->port, webhandle->absPath,
- fs->userName, &nnUrl);
- }
- if (ret) {
- fprintf(stderr, "Failed to create the url connecting to namenode "
- "for file creation/appending, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
- if (!append) {
- ret = launchNnWRITE(nnUrl, &resp);
- } else {
- ret = launchNnAPPEND(nnUrl, &resp);
- }
- if (ret) {
- fprintf(stderr, "fail to get the response from namenode for "
- "file creation/appending, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
- if (!append) {
- ret = parseNnWRITE(resp->header->content, resp->body->content);
- } else {
- ret = parseNnAPPEND(resp->header->content, resp->body->content);
- }
- if (ret) {
- fprintf(stderr, "fail to parse the response from namenode for "
- "file creation/appending, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
- ret = parseDnLoc(resp->header->content, &dnUrl);
- if (ret) {
- fprintf(stderr, "fail to get the datanode url from namenode "
- "for file creation/appending, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
- //store the datanode url in the file handle
- webhandle->datanode = strdup(dnUrl);
- if (!webhandle->datanode) {
- ret = ENOMEM;
- goto done;
- }
- //create a new thread for performing the http transferring
- data = calloc(1, sizeof(*data));
- if (!data) {
- ret = ENOMEM;
- goto done;
- }
- data->url = strdup(dnUrl);
- if (!data->url) {
- ret = ENOMEM;
- goto done;
- }
- data->flags = file->flags;
- data->uploadBuffer = webhandle->uploadBuffer;
- ret = pthread_create(&webhandle->connThread, NULL,
- writeThreadOperation, data);
- if (ret) {
- fprintf(stderr, "ERROR: failed to create the writing thread "
- "in hdfsOpenOutputFileImpl, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- goto done;
- }
- webhandle->uploadBuffer->openFlag = 1;
- done:
- freeResponse(resp);
- free(nnUrl);
- free(dnUrl);
- if (ret) {
- errno = ret;
- if (data) {
- free(data->url);
- free(data);
- }
- }
- return ret;
- }
- hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
- int bufferSize, short replication, tSize blockSize)
- {
- int ret = 0;
- int accmode = flags & O_ACCMODE;
- struct webhdfsFileHandle *webhandle = NULL;
- hdfsFile file = NULL;
- if (fs == NULL || path == NULL) {
- ret = EINVAL;
- goto done;
- }
- if (accmode == O_RDWR) {
- // TODO: the original libhdfs has very hackish support for this; should
- // we do the same? It would actually be a lot easier in libwebhdfs
- // since the protocol isn't connection-oriented.
- fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
- ret = ENOTSUP;
- goto done;
- }
- if ((flags & O_CREAT) && (flags & O_EXCL)) {
- fprintf(stderr,
- "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
- }
- file = calloc(1, sizeof(struct hdfsFile_internal));
- if (!file) {
- ret = ENOMEM;
- goto done;
- }
- file->flags = flags;
- file->type = accmode == O_RDONLY ? INPUT : OUTPUT;
- file->offset = 0;
- webhandle = calloc(1, sizeof(struct webhdfsFileHandle));
- if (!webhandle) {
- ret = ENOMEM;
- goto done;
- }
- webhandle->bufferSize = bufferSize;
- webhandle->replication = replication;
- webhandle->blockSize = blockSize;
- ret = getAbsolutePath(fs, path, &webhandle->absPath);
- if (ret) {
- goto done;
- }
- file->file = webhandle;
- // If open for write/append,
- // open and keep the connection with the target datanode for writing
- if (file->type == OUTPUT) {
- ret = hdfsOpenOutputFileImpl(fs, file);
- if (ret) {
- goto done;
- }
- }
- done:
- if (ret) {
- if (file) {
- freeFileInternal(file); // Also frees webhandle
- } else {
- freeWebFileHandle(webhandle);
- }
- errno = ret;
- return NULL;
- }
- return file;
- }
- int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength)
- {
- errno = ENOTSUP;
- return -1;
- }
- tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length)
- {
- if (length == 0) {
- return 0;
- }
- if (fs == NULL || file == NULL || file->type != OUTPUT || length < 0) {
- errno = EBADF;
- return -1;
- }
-
- struct webhdfsFileHandle *wfile = file->file;
- if (wfile->uploadBuffer && wfile->uploadBuffer->openFlag) {
- resetWebhdfsBuffer(wfile->uploadBuffer, buffer, length);
- return length;
- } else {
- fprintf(stderr,
- "Error: have not opened the file %s for writing yet.\n",
- wfile->absPath);
- errno = EBADF;
- return -1;
- }
- }
- int hdfsCloseFile(hdfsFS fs, hdfsFile file)
- {
- void *respv = NULL;
- threadData *tdata = NULL;
- int ret = 0;
- struct webhdfsFileHandle *wfile = NULL;
- if (file->type == OUTPUT) {
- wfile = file->file;
- pthread_mutex_lock(&(wfile->uploadBuffer->writeMutex));
- wfile->uploadBuffer->closeFlag = 1;
- pthread_cond_signal(&wfile->uploadBuffer->newwrite_or_close);
- pthread_mutex_unlock(&(wfile->uploadBuffer->writeMutex));
-
- // Waiting for the writing thread to terminate
- ret = pthread_join(wfile->connThread, &respv);
- if (ret) {
- fprintf(stderr, "Error when pthread_join in hdfsClose, <%d>: %s.\n",
- ret, hdfs_strerror(ret));
- }
- // Parse the response
- tdata = respv;
- if (!tdata || !(tdata->resp)) {
- fprintf(stderr,
- "ERROR: response from the writing thread is NULL.\n");
- ret = EIO;
- }
- if (file->flags & O_APPEND) {
- ret = parseDnAPPEND(tdata->resp->header->content,
- tdata->resp->body->content);
- } else {
- ret = parseDnWRITE(tdata->resp->header->content,
- tdata->resp->body->content);
- }
- // Free the threaddata
- freeThreadData(tdata);
- }
- freeFileInternal(file);
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- int hdfsFileIsOpenForRead(hdfsFile file)
- {
- return (file->type == INPUT);
- }
- int hdfsFileGetReadStatistics(hdfsFile file,
- struct hdfsReadStatistics **stats)
- {
- errno = ENOTSUP;
- return -1;
- }
- int64_t hdfsReadStatisticsGetRemoteBytesRead(
- const struct hdfsReadStatistics *stats)
- {
- return stats->totalBytesRead - stats->totalLocalBytesRead;
- }
- void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
- {
- free(stats);
- }
- int hdfsFileIsOpenForWrite(hdfsFile file)
- {
- return (file->type == OUTPUT);
- }
- static int hdfsReadImpl(hdfsFS fs, hdfsFile file, void* buffer, tSize off,
- tSize length, tSize *numRead)
- {
- int ret = 0;
- char *url = NULL;
- struct Response *resp = NULL;
- if (fs == NULL || file == NULL || file->type != INPUT || buffer == NULL ||
- length < 0) {
- ret = EINVAL;
- goto done;
- }
- if (length == 0) {
- // Special case: the user supplied a buffer of zero length, so there is
- // nothing to do.
- *numRead = 0;
- goto done;
- }
- resp = calloc(1, sizeof(*resp)); // resp is actually a pointer type
- if (!resp) {
- ret = ENOMEM;
- goto done;
- }
- ret = initResponseBuffer(&(resp->header));
- if (ret) {
- goto done;
- }
- ret = initResponseBuffer(&(resp->body));
- if (ret) {
- goto done;
- }
- memset(buffer, 0, length);
- resp->body->content = buffer;
- resp->body->remaining = length;
-
- ret = createUrlForOPEN(fs->nn, fs->port, file->file->absPath,
- fs->userName, off, length, &url);
- if (ret) {
- goto done;
- }
- ret = launchOPEN(url, resp);
- if (ret) {
- goto done;
- }
- ret = parseOPEN(resp->header->content, resp->body->content);
- if (ret == -1) {
- // Special case: if parseOPEN returns -1, we asked for a byte range
- // with outside what the file contains. In this case, hdfsRead and
- // hdfsPread return 0, meaning end-of-file.
- *numRead = 0;
- } else if (ret == 0) {
- *numRead = (tSize) resp->body->offset;
- }
- done:
- if (resp) {
- freeResponseBuffer(resp->header);
- free(resp->body);
- }
- free(resp);
- free(url);
- return ret;
- }
- tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length)
- {
- int ret = 0;
- tSize numRead = 0;
- ret = hdfsReadImpl(fs, file, buffer, (tSize) file->offset,
- length, &numRead);
- if (ret > 0) { // ret == -1 means end of file
- errno = ret;
- return -1;
- }
- file->offset += numRead;
- return numRead;
- }
- int hdfsAvailable(hdfsFS fs, hdfsFile file)
- {
- /* We actually always block when reading from webhdfs, currently. So the
- * number of bytes that can be read without blocking is currently 0.
- */
- return 0;
- }
- int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos)
- {
- struct webhdfsFileHandle *wf;
- hdfsFileInfo *fileInfo = NULL;
- int ret = 0;
- if (!fs || !file || (file->type == OUTPUT) || (desiredPos < 0)) {
- ret = EINVAL;
- goto done;
- }
- wf = file->file;
- if (!wf) {
- ret = EINVAL;
- goto done;
- }
- fileInfo = hdfsGetPathInfo(fs, wf->absPath);
- if (!fileInfo) {
- ret = errno;
- goto done;
- }
- if (desiredPos > fileInfo->mSize) {
- fprintf(stderr,
- "hdfsSeek for %s failed since the desired position %" PRId64
- " is beyond the size of the file %" PRId64 "\n",
- wf->absPath, desiredPos, fileInfo->mSize);
- ret = ENOTSUP;
- goto done;
- }
- file->offset = desiredPos;
- done:
- if (fileInfo) {
- hdfsFreeFileInfo(fileInfo, 1);
- }
- if (ret) {
- errno = ret;
- return -1;
- }
- return 0;
- }
- tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
- void* buffer, tSize length)
- {
- int ret;
- tSize numRead = 0;
- if (position < 0) {
- errno = EINVAL;
- return -1;
- }
- ret = hdfsReadImpl(fs, file, buffer, (tSize) position, length, &numRead);
- if (ret > 0) {
- errno = ret;
- return -1;
- }
- return numRead;
- }
- tOffset hdfsTell(hdfsFS fs, hdfsFile file)
- {
- if (!file) {
- errno = EINVAL;
- return -1;
- }
- return file->offset;
- }
- char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize)
- {
- int strlength;
- if (fs == NULL || buffer == NULL || bufferSize <= 0) {
- errno = EINVAL;
- return NULL;
- }
- strlength = snprintf(buffer, bufferSize, "%s", fs->workingDir);
- if (strlength >= bufferSize) {
- errno = ENAMETOOLONG;
- return NULL;
- } else if (strlength < 0) {
- errno = EIO;
- return NULL;
- }
- return buffer;
- }
- /** Replace "//" with "/" in path */
- static void normalizePath(char *path)
- {
- int i = 0, j = 0, sawslash = 0;
-
- for (i = j = sawslash = 0; path[i] != '\0'; i++) {
- if (path[i] != '/') {
- sawslash = 0;
- path[j++] = path[i];
- } else if (path[i] == '/' && !sawslash) {
- sawslash = 1;
- path[j++] = '/';
- }
- }
- path[j] = '\0';
- }
- int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
- {
- char *newWorkingDir = NULL;
- size_t strlenPath = 0, newWorkingDirLen = 0;
- int strlength;
- if (fs == NULL || path == NULL) {
- errno = EINVAL;
- return -1;
- }
- strlenPath = strlen(path);
- if (strlenPath < 1) {
- errno = EINVAL;
- return -1;
- }
- // the max string length of the new working dir is
- // (length of old working dir) + (length of given path) + strlen("/") + 1
- newWorkingDirLen = strlen(fs->workingDir) + strlenPath + 2;
- newWorkingDir = malloc(newWorkingDirLen);
- if (!newWorkingDir) {
- errno = ENOMEM;
- return -1;
- }
- strlength = snprintf(newWorkingDir, newWorkingDirLen, "%s%s%s",
- (path[0] == '/') ? "" : fs->workingDir,
- path, (path[strlenPath - 1] == '/') ? "" : "/");
- if (strlength < 0 || strlength >= newWorkingDirLen) {
- free(newWorkingDir);
- errno = EIO;
- return -1;
- }
-
- if (strstr(path, "//")) {
- // normalize the path by replacing "//" with "/"
- normalizePath(newWorkingDir);
- }
-
- free(fs->workingDir);
- fs->workingDir = newWorkingDir;
- return 0;
- }
- void hdfsFreeHosts(char ***blockHosts)
- {
- int i, j;
- for (i=0; blockHosts[i]; i++) {
- for (j=0; blockHosts[i][j]; j++) {
- free(blockHosts[i][j]);
- }
- free(blockHosts[i]);
- }
- free(blockHosts);
- }
- tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
- {
- errno = ENOTSUP;
- return -1;
- }
- int hdfsFileUsesDirectRead(hdfsFile file)
- {
- return 0; // webhdfs never performs direct reads.
- }
- void hdfsFileDisableDirectRead(hdfsFile file)
- {
- // webhdfs never performs direct reads
- }
- int hdfsHFlush(hdfsFS fs, hdfsFile file)
- {
- if (file->type != OUTPUT) {
- errno = EINVAL;
- return -1;
- }
- // TODO: block until our write buffer is flushed (HDFS-3952)
- return 0;
- }
- int hdfsFlush(hdfsFS fs, hdfsFile file)
- {
- if (file->type != OUTPUT) {
- errno = EINVAL;
- return -1;
- }
- // TODO: block until our write buffer is flushed (HDFS-3952)
- return 0;
- }
- char*** hdfsGetHosts(hdfsFS fs, const char* path,
- tOffset start, tOffset length)
- {
- errno = ENOTSUP;
- return NULL;
- }
- tOffset hdfsGetCapacity(hdfsFS fs)
- {
- errno = ENOTSUP;
- return -1;
- }
- tOffset hdfsGetUsed(hdfsFS fs)
- {
- errno = ENOTSUP;
- return -1;
- }
- int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
- {
- errno = ENOTSUP;
- return -1;
- }
- int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
- {
- errno = ENOTSUP;
- return -1;
- }
|