hdfs.c 90 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "exception.h"
  19. #include "hdfs.h"
  20. #include "jni_helper.h"
  21. #include <inttypes.h>
  22. #include <stdio.h>
  23. #include <string.h>
  24. /* Some frequently used Java paths */
  25. #define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
  26. #define HADOOP_PATH "org/apache/hadoop/fs/Path"
  27. #define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem"
  28. #define HADOOP_FS "org/apache/hadoop/fs/FileSystem"
  29. #define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus"
  30. #define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation"
  31. #define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem"
  32. #define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream"
  33. #define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
  34. #define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
  35. #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
  36. #define JAVA_NET_ISA "java/net/InetSocketAddress"
  37. #define JAVA_NET_URI "java/net/URI"
  38. #define JAVA_STRING "java/lang/String"
  39. #define READ_OPTION "org/apache/hadoop/fs/ReadOption"
  40. #define JAVA_VOID "V"
  41. /* Macros for constructing method signatures */
  42. #define JPARAM(X) "L" X ";"
  43. #define JARRPARAM(X) "[L" X ";"
  44. #define JMETHOD1(X, R) "(" X ")" R
  45. #define JMETHOD2(X, Y, R) "(" X Y ")" R
  46. #define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
  47. #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
  48. // Bit fields for hdfsFile_internal flags
  49. #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
  50. tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
  51. static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
  52. /**
  53. * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
  54. */
  55. enum hdfsStreamType
  56. {
  57. UNINITIALIZED = 0,
  58. INPUT = 1,
  59. OUTPUT = 2,
  60. };
  61. /**
  62. * The 'file-handle' to a file in hdfs.
  63. */
  64. struct hdfsFile_internal {
  65. void* file;
  66. enum hdfsStreamType type;
  67. int flags;
  68. };
  69. int hdfsFileIsOpenForRead(hdfsFile file)
  70. {
  71. return (file->type == INPUT);
  72. }
  73. int hdfsFileGetReadStatistics(hdfsFile file,
  74. struct hdfsReadStatistics **stats)
  75. {
  76. jthrowable jthr;
  77. jobject readStats = NULL;
  78. jvalue jVal;
  79. struct hdfsReadStatistics *s = NULL;
  80. int ret;
  81. JNIEnv* env = getJNIEnv();
  82. if (env == NULL) {
  83. errno = EINTERNAL;
  84. return -1;
  85. }
  86. if (file->type != INPUT) {
  87. ret = EINVAL;
  88. goto done;
  89. }
  90. jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
  91. "org/apache/hadoop/hdfs/client/HdfsDataInputStream",
  92. "getReadStatistics",
  93. "()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;");
  94. if (jthr) {
  95. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  96. "hdfsFileGetReadStatistics: getReadStatistics failed");
  97. goto done;
  98. }
  99. readStats = jVal.l;
  100. s = malloc(sizeof(struct hdfsReadStatistics));
  101. if (!s) {
  102. ret = ENOMEM;
  103. goto done;
  104. }
  105. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  106. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  107. "getTotalBytesRead", "()J");
  108. if (jthr) {
  109. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  110. "hdfsFileGetReadStatistics: getTotalBytesRead failed");
  111. goto done;
  112. }
  113. s->totalBytesRead = jVal.j;
  114. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  115. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  116. "getTotalLocalBytesRead", "()J");
  117. if (jthr) {
  118. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  119. "hdfsFileGetReadStatistics: getTotalLocalBytesRead failed");
  120. goto done;
  121. }
  122. s->totalLocalBytesRead = jVal.j;
  123. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  124. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  125. "getTotalShortCircuitBytesRead", "()J");
  126. if (jthr) {
  127. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  128. "hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed");
  129. goto done;
  130. }
  131. s->totalShortCircuitBytesRead = jVal.j;
  132. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  133. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  134. "getTotalZeroCopyBytesRead", "()J");
  135. if (jthr) {
  136. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  137. "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
  138. goto done;
  139. }
  140. s->totalZeroCopyBytesRead = jVal.j;
  141. *stats = s;
  142. s = NULL;
  143. ret = 0;
  144. done:
  145. destroyLocalReference(env, readStats);
  146. free(s);
  147. if (ret) {
  148. errno = ret;
  149. return -1;
  150. }
  151. return 0;
  152. }
  153. int64_t hdfsReadStatisticsGetRemoteBytesRead(
  154. const struct hdfsReadStatistics *stats)
  155. {
  156. return stats->totalBytesRead - stats->totalLocalBytesRead;
  157. }
  158. void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
  159. {
  160. free(stats);
  161. }
  162. int hdfsFileIsOpenForWrite(hdfsFile file)
  163. {
  164. return (file->type == OUTPUT);
  165. }
  166. int hdfsFileUsesDirectRead(hdfsFile file)
  167. {
  168. return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ);
  169. }
  170. void hdfsFileDisableDirectRead(hdfsFile file)
  171. {
  172. file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
  173. }
  174. int hdfsDisableDomainSocketSecurity(void)
  175. {
  176. jthrowable jthr;
  177. JNIEnv* env = getJNIEnv();
  178. if (env == NULL) {
  179. errno = EINTERNAL;
  180. return -1;
  181. }
  182. jthr = invokeMethod(env, NULL, STATIC, NULL,
  183. "org/apache/hadoop/net/unix/DomainSocket",
  184. "disableBindPathValidation", "()V");
  185. if (jthr) {
  186. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  187. "DomainSocket#disableBindPathValidation");
  188. return -1;
  189. }
  190. return 0;
  191. }
  192. /**
  193. * hdfsJniEnv: A wrapper struct to be used as 'value'
  194. * while saving thread -> JNIEnv* mappings
  195. */
  196. typedef struct
  197. {
  198. JNIEnv* env;
  199. } hdfsJniEnv;
  200. /**
  201. * Helper function to create a org.apache.hadoop.fs.Path object.
  202. * @param env: The JNIEnv pointer.
  203. * @param path: The file-path for which to construct org.apache.hadoop.fs.Path
  204. * object.
  205. * @return Returns a jobject on success and NULL on error.
  206. */
  207. static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
  208. jobject *out)
  209. {
  210. jthrowable jthr;
  211. jstring jPathString;
  212. jobject jPath;
  213. //Construct a java.lang.String object
  214. jthr = newJavaStr(env, path, &jPathString);
  215. if (jthr)
  216. return jthr;
  217. //Construct the org.apache.hadoop.fs.Path object
  218. jthr = constructNewObjectOfClass(env, &jPath, "org/apache/hadoop/fs/Path",
  219. "(Ljava/lang/String;)V", jPathString);
  220. destroyLocalReference(env, jPathString);
  221. if (jthr)
  222. return jthr;
  223. *out = jPath;
  224. return NULL;
  225. }
  226. static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
  227. const char *key, char **val)
  228. {
  229. jthrowable jthr;
  230. jvalue jVal;
  231. jstring jkey = NULL, jRet = NULL;
  232. jthr = newJavaStr(env, key, &jkey);
  233. if (jthr)
  234. goto done;
  235. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  236. HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
  237. JPARAM(JAVA_STRING)), jkey);
  238. if (jthr)
  239. goto done;
  240. jRet = jVal.l;
  241. jthr = newCStr(env, jRet, val);
  242. done:
  243. destroyLocalReference(env, jkey);
  244. destroyLocalReference(env, jRet);
  245. return jthr;
  246. }
  247. int hdfsConfGetStr(const char *key, char **val)
  248. {
  249. JNIEnv *env;
  250. int ret;
  251. jthrowable jthr;
  252. jobject jConfiguration = NULL;
  253. env = getJNIEnv();
  254. if (env == NULL) {
  255. ret = EINTERNAL;
  256. goto done;
  257. }
  258. jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
  259. if (jthr) {
  260. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  261. "hdfsConfGetStr(%s): new Configuration", key);
  262. goto done;
  263. }
  264. jthr = hadoopConfGetStr(env, jConfiguration, key, val);
  265. if (jthr) {
  266. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  267. "hdfsConfGetStr(%s): hadoopConfGetStr", key);
  268. goto done;
  269. }
  270. ret = 0;
  271. done:
  272. destroyLocalReference(env, jConfiguration);
  273. if (ret)
  274. errno = ret;
  275. return ret;
  276. }
  277. void hdfsConfStrFree(char *val)
  278. {
  279. free(val);
  280. }
  281. static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
  282. const char *key, int32_t *val)
  283. {
  284. jthrowable jthr = NULL;
  285. jvalue jVal;
  286. jstring jkey = NULL;
  287. jthr = newJavaStr(env, key, &jkey);
  288. if (jthr)
  289. return jthr;
  290. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  291. HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"),
  292. jkey, (jint)(*val));
  293. destroyLocalReference(env, jkey);
  294. if (jthr)
  295. return jthr;
  296. *val = jVal.i;
  297. return NULL;
  298. }
  299. int hdfsConfGetInt(const char *key, int32_t *val)
  300. {
  301. JNIEnv *env;
  302. int ret;
  303. jobject jConfiguration = NULL;
  304. jthrowable jthr;
  305. env = getJNIEnv();
  306. if (env == NULL) {
  307. ret = EINTERNAL;
  308. goto done;
  309. }
  310. jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
  311. if (jthr) {
  312. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  313. "hdfsConfGetInt(%s): new Configuration", key);
  314. goto done;
  315. }
  316. jthr = hadoopConfGetInt(env, jConfiguration, key, val);
  317. if (jthr) {
  318. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  319. "hdfsConfGetInt(%s): hadoopConfGetInt", key);
  320. goto done;
  321. }
  322. ret = 0;
  323. done:
  324. destroyLocalReference(env, jConfiguration);
  325. if (ret)
  326. errno = ret;
  327. return ret;
  328. }
  329. struct hdfsBuilderConfOpt {
  330. struct hdfsBuilderConfOpt *next;
  331. const char *key;
  332. const char *val;
  333. };
  334. struct hdfsBuilder {
  335. int forceNewInstance;
  336. const char *nn;
  337. tPort port;
  338. const char *kerbTicketCachePath;
  339. const char *userName;
  340. struct hdfsBuilderConfOpt *opts;
  341. };
  342. struct hdfsBuilder *hdfsNewBuilder(void)
  343. {
  344. struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
  345. if (!bld) {
  346. errno = ENOMEM;
  347. return NULL;
  348. }
  349. return bld;
  350. }
  351. int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
  352. const char *val)
  353. {
  354. struct hdfsBuilderConfOpt *opt, *next;
  355. opt = calloc(1, sizeof(struct hdfsBuilderConfOpt));
  356. if (!opt)
  357. return -ENOMEM;
  358. next = bld->opts;
  359. bld->opts = opt;
  360. opt->next = next;
  361. opt->key = key;
  362. opt->val = val;
  363. return 0;
  364. }
  365. void hdfsFreeBuilder(struct hdfsBuilder *bld)
  366. {
  367. struct hdfsBuilderConfOpt *cur, *next;
  368. cur = bld->opts;
  369. for (cur = bld->opts; cur; ) {
  370. next = cur->next;
  371. free(cur);
  372. cur = next;
  373. }
  374. free(bld);
  375. }
  376. void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
  377. {
  378. bld->forceNewInstance = 1;
  379. }
  380. void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
  381. {
  382. bld->nn = nn;
  383. }
  384. void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
  385. {
  386. bld->port = port;
  387. }
  388. void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
  389. {
  390. bld->userName = userName;
  391. }
  392. void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
  393. const char *kerbTicketCachePath)
  394. {
  395. bld->kerbTicketCachePath = kerbTicketCachePath;
  396. }
  397. hdfsFS hdfsConnect(const char* host, tPort port)
  398. {
  399. struct hdfsBuilder *bld = hdfsNewBuilder();
  400. if (!bld)
  401. return NULL;
  402. hdfsBuilderSetNameNode(bld, host);
  403. hdfsBuilderSetNameNodePort(bld, port);
  404. return hdfsBuilderConnect(bld);
  405. }
  406. /** Always return a new FileSystem handle */
  407. hdfsFS hdfsConnectNewInstance(const char* host, tPort port)
  408. {
  409. struct hdfsBuilder *bld = hdfsNewBuilder();
  410. if (!bld)
  411. return NULL;
  412. hdfsBuilderSetNameNode(bld, host);
  413. hdfsBuilderSetNameNodePort(bld, port);
  414. hdfsBuilderSetForceNewInstance(bld);
  415. return hdfsBuilderConnect(bld);
  416. }
  417. hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user)
  418. {
  419. struct hdfsBuilder *bld = hdfsNewBuilder();
  420. if (!bld)
  421. return NULL;
  422. hdfsBuilderSetNameNode(bld, host);
  423. hdfsBuilderSetNameNodePort(bld, port);
  424. hdfsBuilderSetUserName(bld, user);
  425. return hdfsBuilderConnect(bld);
  426. }
  427. /** Always return a new FileSystem handle */
  428. hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
  429. const char *user)
  430. {
  431. struct hdfsBuilder *bld = hdfsNewBuilder();
  432. if (!bld)
  433. return NULL;
  434. hdfsBuilderSetNameNode(bld, host);
  435. hdfsBuilderSetNameNodePort(bld, port);
  436. hdfsBuilderSetForceNewInstance(bld);
  437. hdfsBuilderSetUserName(bld, user);
  438. return hdfsBuilderConnect(bld);
  439. }
  440. /**
  441. * Calculate the effective URI to use, given a builder configuration.
  442. *
  443. * If there is not already a URI scheme, we prepend 'hdfs://'.
  444. *
  445. * If there is not already a port specified, and a port was given to the
  446. * builder, we suffix that port. If there is a port specified but also one in
  447. * the URI, that is an error.
  448. *
  449. * @param bld The hdfs builder object
  450. * @param uri (out param) dynamically allocated string representing the
  451. * effective URI
  452. *
  453. * @return 0 on success; error code otherwise
  454. */
  455. static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri)
  456. {
  457. const char *scheme;
  458. char suffix[64];
  459. const char *lastColon;
  460. char *u;
  461. size_t uriLen;
  462. if (!bld->nn)
  463. return EINVAL;
  464. scheme = (strstr(bld->nn, "://")) ? "" : "hdfs://";
  465. if (bld->port == 0) {
  466. suffix[0] = '\0';
  467. } else {
  468. lastColon = rindex(bld->nn, ':');
  469. if (lastColon && (strspn(lastColon + 1, "0123456789") ==
  470. strlen(lastColon + 1))) {
  471. fprintf(stderr, "port %d was given, but URI '%s' already "
  472. "contains a port!\n", bld->port, bld->nn);
  473. return EINVAL;
  474. }
  475. snprintf(suffix, sizeof(suffix), ":%d", bld->port);
  476. }
  477. uriLen = strlen(scheme) + strlen(bld->nn) + strlen(suffix);
  478. u = malloc((uriLen + 1) * (sizeof(char)));
  479. if (!u) {
  480. fprintf(stderr, "calcEffectiveURI: out of memory");
  481. return ENOMEM;
  482. }
  483. snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn, suffix);
  484. *uri = u;
  485. return 0;
  486. }
  487. static const char *maybeNull(const char *str)
  488. {
  489. return str ? str : "(NULL)";
  490. }
  491. static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
  492. char *buf, size_t bufLen)
  493. {
  494. snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, "
  495. "kerbTicketCachePath=%s, userName=%s",
  496. bld->forceNewInstance, maybeNull(bld->nn), bld->port,
  497. maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
  498. return buf;
  499. }
  500. hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
  501. {
  502. JNIEnv *env = 0;
  503. jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
  504. jstring jURIString = NULL, jUserString = NULL;
  505. jvalue jVal;
  506. jthrowable jthr = NULL;
  507. char *cURI = 0, buf[512];
  508. int ret;
  509. jobject jRet = NULL;
  510. struct hdfsBuilderConfOpt *opt;
  511. //Get the JNIEnv* corresponding to current thread
  512. env = getJNIEnv();
  513. if (env == NULL) {
  514. ret = EINTERNAL;
  515. goto done;
  516. }
  517. // jConfiguration = new Configuration();
  518. jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
  519. if (jthr) {
  520. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  521. "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
  522. goto done;
  523. }
  524. // set configuration values
  525. for (opt = bld->opts; opt; opt = opt->next) {
  526. jthr = hadoopConfSetStr(env, jConfiguration, opt->key, opt->val);
  527. if (jthr) {
  528. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  529. "hdfsBuilderConnect(%s): error setting conf '%s' to '%s'",
  530. hdfsBuilderToStr(bld, buf, sizeof(buf)), opt->key, opt->val);
  531. goto done;
  532. }
  533. }
  534. //Check what type of FileSystem the caller wants...
  535. if (bld->nn == NULL) {
  536. // Get a local filesystem.
  537. if (bld->forceNewInstance) {
  538. // fs = FileSytem#newInstanceLocal(conf);
  539. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
  540. "newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF),
  541. JPARAM(HADOOP_LOCALFS)), jConfiguration);
  542. if (jthr) {
  543. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  544. "hdfsBuilderConnect(%s)",
  545. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  546. goto done;
  547. }
  548. jFS = jVal.l;
  549. } else {
  550. // fs = FileSytem#getLocal(conf);
  551. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal",
  552. JMETHOD1(JPARAM(HADOOP_CONF),
  553. JPARAM(HADOOP_LOCALFS)),
  554. jConfiguration);
  555. if (jthr) {
  556. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  557. "hdfsBuilderConnect(%s)",
  558. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  559. goto done;
  560. }
  561. jFS = jVal.l;
  562. }
  563. } else {
  564. if (!strcmp(bld->nn, "default")) {
  565. // jURI = FileSystem.getDefaultUri(conf)
  566. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
  567. "getDefaultUri",
  568. "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
  569. jConfiguration);
  570. if (jthr) {
  571. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  572. "hdfsBuilderConnect(%s)",
  573. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  574. goto done;
  575. }
  576. jURI = jVal.l;
  577. } else {
  578. // fs = FileSystem#get(URI, conf, ugi);
  579. ret = calcEffectiveURI(bld, &cURI);
  580. if (ret)
  581. goto done;
  582. jthr = newJavaStr(env, cURI, &jURIString);
  583. if (jthr) {
  584. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  585. "hdfsBuilderConnect(%s)",
  586. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  587. goto done;
  588. }
  589. jthr = invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI,
  590. "create", "(Ljava/lang/String;)Ljava/net/URI;",
  591. jURIString);
  592. if (jthr) {
  593. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  594. "hdfsBuilderConnect(%s)",
  595. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  596. goto done;
  597. }
  598. jURI = jVal.l;
  599. }
  600. if (bld->kerbTicketCachePath) {
  601. jthr = hadoopConfSetStr(env, jConfiguration,
  602. KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
  603. if (jthr) {
  604. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  605. "hdfsBuilderConnect(%s)",
  606. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  607. goto done;
  608. }
  609. }
  610. jthr = newJavaStr(env, bld->userName, &jUserString);
  611. if (jthr) {
  612. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  613. "hdfsBuilderConnect(%s)",
  614. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  615. goto done;
  616. }
  617. if (bld->forceNewInstance) {
  618. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
  619. "newInstance", JMETHOD3(JPARAM(JAVA_NET_URI),
  620. JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
  621. JPARAM(HADOOP_FS)),
  622. jURI, jConfiguration, jUserString);
  623. if (jthr) {
  624. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  625. "hdfsBuilderConnect(%s)",
  626. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  627. goto done;
  628. }
  629. jFS = jVal.l;
  630. } else {
  631. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get",
  632. JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
  633. JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)),
  634. jURI, jConfiguration, jUserString);
  635. if (jthr) {
  636. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  637. "hdfsBuilderConnect(%s)",
  638. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  639. goto done;
  640. }
  641. jFS = jVal.l;
  642. }
  643. }
  644. jRet = (*env)->NewGlobalRef(env, jFS);
  645. if (!jRet) {
  646. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  647. "hdfsBuilderConnect(%s)",
  648. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  649. goto done;
  650. }
  651. ret = 0;
  652. done:
  653. // Release unnecessary local references
  654. destroyLocalReference(env, jConfiguration);
  655. destroyLocalReference(env, jFS);
  656. destroyLocalReference(env, jURI);
  657. destroyLocalReference(env, jCachePath);
  658. destroyLocalReference(env, jURIString);
  659. destroyLocalReference(env, jUserString);
  660. free(cURI);
  661. hdfsFreeBuilder(bld);
  662. if (ret) {
  663. errno = ret;
  664. return NULL;
  665. }
  666. return (hdfsFS)jRet;
  667. }
  668. int hdfsDisconnect(hdfsFS fs)
  669. {
  670. // JAVA EQUIVALENT:
  671. // fs.close()
  672. //Get the JNIEnv* corresponding to current thread
  673. JNIEnv* env = getJNIEnv();
  674. int ret;
  675. if (env == NULL) {
  676. errno = EINTERNAL;
  677. return -1;
  678. }
  679. //Parameters
  680. jobject jFS = (jobject)fs;
  681. //Sanity check
  682. if (fs == NULL) {
  683. errno = EBADF;
  684. return -1;
  685. }
  686. jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  687. "close", "()V");
  688. if (jthr) {
  689. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  690. "hdfsDisconnect: FileSystem#close");
  691. } else {
  692. ret = 0;
  693. }
  694. (*env)->DeleteGlobalRef(env, jFS);
  695. if (ret) {
  696. errno = ret;
  697. return -1;
  698. }
  699. return 0;
  700. }
  701. /**
  702. * Get the default block size of a FileSystem object.
  703. *
  704. * @param env The Java env
  705. * @param jFS The FileSystem object
  706. * @param jPath The path to find the default blocksize at
  707. * @param out (out param) the default block size
  708. *
  709. * @return NULL on success; or the exception
  710. */
  711. static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS,
  712. jobject jPath, jlong *out)
  713. {
  714. jthrowable jthr;
  715. jvalue jVal;
  716. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  717. "getDefaultBlockSize", JMETHOD1(JPARAM(HADOOP_PATH), "J"), jPath);
  718. if (jthr)
  719. return jthr;
  720. *out = jVal.j;
  721. return NULL;
  722. }
  723. hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
  724. int bufferSize, short replication, tSize blockSize)
  725. {
  726. /*
  727. JAVA EQUIVALENT:
  728. File f = new File(path);
  729. FSData{Input|Output}Stream f{is|os} = fs.create(f);
  730. return f{is|os};
  731. */
  732. /* Get the JNIEnv* corresponding to current thread */
  733. JNIEnv* env = getJNIEnv();
  734. int accmode = flags & O_ACCMODE;
  735. if (env == NULL) {
  736. errno = EINTERNAL;
  737. return NULL;
  738. }
  739. jstring jStrBufferSize = NULL, jStrReplication = NULL;
  740. jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
  741. jobject jFS = (jobject)fs;
  742. jthrowable jthr;
  743. jvalue jVal;
  744. hdfsFile file = NULL;
  745. int ret;
  746. if (accmode == O_RDONLY || accmode == O_WRONLY) {
  747. /* yay */
  748. } else if (accmode == O_RDWR) {
  749. fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
  750. errno = ENOTSUP;
  751. return NULL;
  752. } else {
  753. fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode);
  754. errno = EINVAL;
  755. return NULL;
  756. }
  757. if ((flags & O_CREAT) && (flags & O_EXCL)) {
  758. fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
  759. }
  760. /* The hadoop java api/signature */
  761. const char* method = NULL;
  762. const char* signature = NULL;
  763. if (accmode == O_RDONLY) {
  764. method = "open";
  765. signature = JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_ISTRM));
  766. } else if (flags & O_APPEND) {
  767. method = "append";
  768. signature = JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_OSTRM));
  769. } else {
  770. method = "create";
  771. signature = JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_OSTRM));
  772. }
  773. /* Create an object of org.apache.hadoop.fs.Path */
  774. jthr = constructNewObjectOfPath(env, path, &jPath);
  775. if (jthr) {
  776. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  777. "hdfsOpenFile(%s): constructNewObjectOfPath", path);
  778. goto done;
  779. }
  780. /* Get the Configuration object from the FileSystem object */
  781. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  782. "getConf", JMETHOD1("", JPARAM(HADOOP_CONF)));
  783. if (jthr) {
  784. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  785. "hdfsOpenFile(%s): FileSystem#getConf", path);
  786. goto done;
  787. }
  788. jConfiguration = jVal.l;
  789. jint jBufferSize = bufferSize;
  790. jshort jReplication = replication;
  791. jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size");
  792. if (!jStrBufferSize) {
  793. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
  794. goto done;
  795. }
  796. jStrReplication = (*env)->NewStringUTF(env, "dfs.replication");
  797. if (!jStrReplication) {
  798. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
  799. goto done;
  800. }
  801. if (!bufferSize) {
  802. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  803. HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I",
  804. jStrBufferSize, 4096);
  805. if (jthr) {
  806. ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND |
  807. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_UNRESOLVED_LINK,
  808. "hdfsOpenFile(%s): Configuration#getInt(io.file.buffer.size)",
  809. path);
  810. goto done;
  811. }
  812. jBufferSize = jVal.i;
  813. }
  814. if ((accmode == O_WRONLY) && (flags & O_APPEND) == 0) {
  815. if (!replication) {
  816. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  817. HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I",
  818. jStrReplication, 1);
  819. if (jthr) {
  820. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  821. "hdfsOpenFile(%s): Configuration#getInt(dfs.replication)",
  822. path);
  823. goto done;
  824. }
  825. jReplication = jVal.i;
  826. }
  827. }
  828. /* Create and return either the FSDataInputStream or
  829. FSDataOutputStream references jobject jStream */
  830. // READ?
  831. if (accmode == O_RDONLY) {
  832. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  833. method, signature, jPath, jBufferSize);
  834. } else if ((accmode == O_WRONLY) && (flags & O_APPEND)) {
  835. // WRITE/APPEND?
  836. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  837. method, signature, jPath);
  838. } else {
  839. // WRITE/CREATE
  840. jboolean jOverWrite = 1;
  841. jlong jBlockSize = blockSize;
  842. if (jBlockSize == 0) {
  843. jthr = getDefaultBlockSize(env, jFS, jPath, &jBlockSize);
  844. if (jthr) {
  845. ret = EIO;
  846. goto done;
  847. }
  848. }
  849. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  850. method, signature, jPath, jOverWrite,
  851. jBufferSize, jReplication, jBlockSize);
  852. }
  853. if (jthr) {
  854. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  855. "hdfsOpenFile(%s): FileSystem#%s(%s)", path, method, signature);
  856. goto done;
  857. }
  858. jFile = jVal.l;
  859. file = calloc(1, sizeof(struct hdfsFile_internal));
  860. if (!file) {
  861. fprintf(stderr, "hdfsOpenFile(%s): OOM create hdfsFile\n", path);
  862. ret = ENOMEM;
  863. goto done;
  864. }
  865. file->file = (*env)->NewGlobalRef(env, jFile);
  866. if (!file->file) {
  867. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  868. "hdfsOpenFile(%s): NewGlobalRef", path);
  869. goto done;
  870. }
  871. file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT);
  872. file->flags = 0;
  873. if ((flags & O_WRONLY) == 0) {
  874. // Try a test read to see if we can do direct reads
  875. char buf;
  876. if (readDirect(fs, file, &buf, 0) == 0) {
  877. // Success - 0-byte read should return 0
  878. file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
  879. } else if (errno != ENOTSUP) {
  880. // Unexpected error. Clear it, don't set the direct flag.
  881. fprintf(stderr,
  882. "hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
  883. "for direct read compatibility\n", path, errno);
  884. }
  885. }
  886. ret = 0;
  887. done:
  888. destroyLocalReference(env, jStrBufferSize);
  889. destroyLocalReference(env, jStrReplication);
  890. destroyLocalReference(env, jConfiguration);
  891. destroyLocalReference(env, jPath);
  892. destroyLocalReference(env, jFile);
  893. if (ret) {
  894. if (file) {
  895. if (file->file) {
  896. (*env)->DeleteGlobalRef(env, file->file);
  897. }
  898. free(file);
  899. }
  900. errno = ret;
  901. return NULL;
  902. }
  903. return file;
  904. }
  905. int hdfsCloseFile(hdfsFS fs, hdfsFile file)
  906. {
  907. int ret;
  908. // JAVA EQUIVALENT:
  909. // file.close
  910. //Get the JNIEnv* corresponding to current thread
  911. JNIEnv* env = getJNIEnv();
  912. if (env == NULL) {
  913. errno = EINTERNAL;
  914. return -1;
  915. }
  916. //Caught exception
  917. jthrowable jthr;
  918. //Sanity check
  919. if (!file || file->type == UNINITIALIZED) {
  920. errno = EBADF;
  921. return -1;
  922. }
  923. //The interface whose 'close' method to be called
  924. const char* interface = (file->type == INPUT) ?
  925. HADOOP_ISTRM : HADOOP_OSTRM;
  926. jthr = invokeMethod(env, NULL, INSTANCE, file->file, interface,
  927. "close", "()V");
  928. if (jthr) {
  929. const char *interfaceShortName = (file->type == INPUT) ?
  930. "FSDataInputStream" : "FSDataOutputStream";
  931. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  932. "%s#close", interfaceShortName);
  933. } else {
  934. ret = 0;
  935. }
  936. //De-allocate memory
  937. (*env)->DeleteGlobalRef(env, file->file);
  938. free(file);
  939. if (ret) {
  940. errno = ret;
  941. return -1;
  942. }
  943. return 0;
  944. }
  945. int hdfsExists(hdfsFS fs, const char *path)
  946. {
  947. JNIEnv *env = getJNIEnv();
  948. if (env == NULL) {
  949. errno = EINTERNAL;
  950. return -1;
  951. }
  952. jobject jPath;
  953. jvalue jVal;
  954. jobject jFS = (jobject)fs;
  955. jthrowable jthr;
  956. if (path == NULL) {
  957. errno = EINVAL;
  958. return -1;
  959. }
  960. jthr = constructNewObjectOfPath(env, path, &jPath);
  961. if (jthr) {
  962. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  963. "hdfsExists: constructNewObjectOfPath");
  964. return -1;
  965. }
  966. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  967. "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath);
  968. destroyLocalReference(env, jPath);
  969. if (jthr) {
  970. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  971. "hdfsExists: invokeMethod(%s)",
  972. JMETHOD1(JPARAM(HADOOP_PATH), "Z"));
  973. return -1;
  974. }
  975. if (jVal.z) {
  976. return 0;
  977. } else {
  978. errno = ENOENT;
  979. return -1;
  980. }
  981. }
  982. // Checks input file for readiness for reading.
  983. static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
  984. jobject* jInputStream)
  985. {
  986. *jInputStream = (jobject)(f ? f->file : NULL);
  987. //Sanity check
  988. if (!f || f->type == UNINITIALIZED) {
  989. errno = EBADF;
  990. return -1;
  991. }
  992. //Error checking... make sure that this file is 'readable'
  993. if (f->type != INPUT) {
  994. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  995. errno = EINVAL;
  996. return -1;
  997. }
  998. return 0;
  999. }
  1000. tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  1001. {
  1002. if (length == 0) {
  1003. return 0;
  1004. } else if (length < 0) {
  1005. errno = EINVAL;
  1006. return -1;
  1007. }
  1008. if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
  1009. return readDirect(fs, f, buffer, length);
  1010. }
  1011. // JAVA EQUIVALENT:
  1012. // byte [] bR = new byte[length];
  1013. // fis.read(bR);
  1014. //Get the JNIEnv* corresponding to current thread
  1015. JNIEnv* env = getJNIEnv();
  1016. if (env == NULL) {
  1017. errno = EINTERNAL;
  1018. return -1;
  1019. }
  1020. //Parameters
  1021. jobject jInputStream;
  1022. if (readPrepare(env, fs, f, &jInputStream) == -1) {
  1023. return -1;
  1024. }
  1025. jbyteArray jbRarray;
  1026. jint noReadBytes = length;
  1027. jvalue jVal;
  1028. jthrowable jthr;
  1029. //Read the requisite bytes
  1030. jbRarray = (*env)->NewByteArray(env, length);
  1031. if (!jbRarray) {
  1032. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1033. "hdfsRead: NewByteArray");
  1034. return -1;
  1035. }
  1036. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, HADOOP_ISTRM,
  1037. "read", "([B)I", jbRarray);
  1038. if (jthr) {
  1039. destroyLocalReference(env, jbRarray);
  1040. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1041. "hdfsRead: FSDataInputStream#read");
  1042. return -1;
  1043. }
  1044. if (jVal.i < 0) {
  1045. // EOF
  1046. destroyLocalReference(env, jbRarray);
  1047. return 0;
  1048. } else if (jVal.i == 0) {
  1049. destroyLocalReference(env, jbRarray);
  1050. errno = EINTR;
  1051. return -1;
  1052. }
  1053. (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
  1054. destroyLocalReference(env, jbRarray);
  1055. if ((*env)->ExceptionCheck(env)) {
  1056. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1057. "hdfsRead: GetByteArrayRegion");
  1058. return -1;
  1059. }
  1060. return jVal.i;
  1061. }
  1062. // Reads using the read(ByteBuffer) API, which does fewer copies
  1063. tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  1064. {
  1065. // JAVA EQUIVALENT:
  1066. // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
  1067. // fis.read(bbuffer);
  1068. //Get the JNIEnv* corresponding to current thread
  1069. JNIEnv* env = getJNIEnv();
  1070. if (env == NULL) {
  1071. errno = EINTERNAL;
  1072. return -1;
  1073. }
  1074. jobject jInputStream;
  1075. if (readPrepare(env, fs, f, &jInputStream) == -1) {
  1076. return -1;
  1077. }
  1078. jvalue jVal;
  1079. jthrowable jthr;
  1080. //Read the requisite bytes
  1081. jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length);
  1082. if (bb == NULL) {
  1083. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1084. "readDirect: NewDirectByteBuffer");
  1085. return -1;
  1086. }
  1087. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
  1088. HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb);
  1089. destroyLocalReference(env, bb);
  1090. if (jthr) {
  1091. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1092. "readDirect: FSDataInputStream#read");
  1093. return -1;
  1094. }
  1095. return (jVal.i < 0) ? 0 : jVal.i;
  1096. }
  1097. tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
  1098. void* buffer, tSize length)
  1099. {
  1100. JNIEnv* env;
  1101. jbyteArray jbRarray;
  1102. jvalue jVal;
  1103. jthrowable jthr;
  1104. if (length == 0) {
  1105. return 0;
  1106. } else if (length < 0) {
  1107. errno = EINVAL;
  1108. return -1;
  1109. }
  1110. if (!f || f->type == UNINITIALIZED) {
  1111. errno = EBADF;
  1112. return -1;
  1113. }
  1114. env = getJNIEnv();
  1115. if (env == NULL) {
  1116. errno = EINTERNAL;
  1117. return -1;
  1118. }
  1119. //Error checking... make sure that this file is 'readable'
  1120. if (f->type != INPUT) {
  1121. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  1122. errno = EINVAL;
  1123. return -1;
  1124. }
  1125. // JAVA EQUIVALENT:
  1126. // byte [] bR = new byte[length];
  1127. // fis.read(pos, bR, 0, length);
  1128. jbRarray = (*env)->NewByteArray(env, length);
  1129. if (!jbRarray) {
  1130. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1131. "hdfsPread: NewByteArray");
  1132. return -1;
  1133. }
  1134. jthr = invokeMethod(env, &jVal, INSTANCE, f->file, HADOOP_ISTRM,
  1135. "read", "(J[BII)I", position, jbRarray, 0, length);
  1136. if (jthr) {
  1137. destroyLocalReference(env, jbRarray);
  1138. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1139. "hdfsPread: FSDataInputStream#read");
  1140. return -1;
  1141. }
  1142. if (jVal.i < 0) {
  1143. // EOF
  1144. destroyLocalReference(env, jbRarray);
  1145. return 0;
  1146. } else if (jVal.i == 0) {
  1147. destroyLocalReference(env, jbRarray);
  1148. errno = EINTR;
  1149. return -1;
  1150. }
  1151. (*env)->GetByteArrayRegion(env, jbRarray, 0, jVal.i, buffer);
  1152. destroyLocalReference(env, jbRarray);
  1153. if ((*env)->ExceptionCheck(env)) {
  1154. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1155. "hdfsPread: GetByteArrayRegion");
  1156. return -1;
  1157. }
  1158. return jVal.i;
  1159. }
  1160. tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
  1161. {
  1162. // JAVA EQUIVALENT
  1163. // byte b[] = str.getBytes();
  1164. // fso.write(b);
  1165. //Get the JNIEnv* corresponding to current thread
  1166. JNIEnv* env = getJNIEnv();
  1167. if (env == NULL) {
  1168. errno = EINTERNAL;
  1169. return -1;
  1170. }
  1171. //Sanity check
  1172. if (!f || f->type == UNINITIALIZED) {
  1173. errno = EBADF;
  1174. return -1;
  1175. }
  1176. jobject jOutputStream = f->file;
  1177. jbyteArray jbWarray;
  1178. jthrowable jthr;
  1179. if (length < 0) {
  1180. errno = EINVAL;
  1181. return -1;
  1182. }
  1183. //Error checking... make sure that this file is 'writable'
  1184. if (f->type != OUTPUT) {
  1185. fprintf(stderr, "Cannot write into a non-OutputStream object!\n");
  1186. errno = EINVAL;
  1187. return -1;
  1188. }
  1189. if (length < 0) {
  1190. errno = EINVAL;
  1191. return -1;
  1192. }
  1193. if (length == 0) {
  1194. return 0;
  1195. }
  1196. //Write the requisite bytes into the file
  1197. jbWarray = (*env)->NewByteArray(env, length);
  1198. if (!jbWarray) {
  1199. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1200. "hdfsWrite: NewByteArray");
  1201. return -1;
  1202. }
  1203. (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer);
  1204. if ((*env)->ExceptionCheck(env)) {
  1205. destroyLocalReference(env, jbWarray);
  1206. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1207. "hdfsWrite(length = %d): SetByteArrayRegion", length);
  1208. return -1;
  1209. }
  1210. jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  1211. HADOOP_OSTRM, "write", "([B)V", jbWarray);
  1212. destroyLocalReference(env, jbWarray);
  1213. if (jthr) {
  1214. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1215. "hdfsWrite: FSDataOutputStream#write");
  1216. return -1;
  1217. }
  1218. // Unlike most Java streams, FSDataOutputStream never does partial writes.
  1219. // If we succeeded, all the data was written.
  1220. return length;
  1221. }
  1222. int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos)
  1223. {
  1224. // JAVA EQUIVALENT
  1225. // fis.seek(pos);
  1226. //Get the JNIEnv* corresponding to current thread
  1227. JNIEnv* env = getJNIEnv();
  1228. if (env == NULL) {
  1229. errno = EINTERNAL;
  1230. return -1;
  1231. }
  1232. //Sanity check
  1233. if (!f || f->type != INPUT) {
  1234. errno = EBADF;
  1235. return -1;
  1236. }
  1237. jobject jInputStream = f->file;
  1238. jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jInputStream,
  1239. HADOOP_ISTRM, "seek", "(J)V", desiredPos);
  1240. if (jthr) {
  1241. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1242. "hdfsSeek(desiredPos=%" PRId64 ")"
  1243. ": FSDataInputStream#seek", desiredPos);
  1244. return -1;
  1245. }
  1246. return 0;
  1247. }
  1248. tOffset hdfsTell(hdfsFS fs, hdfsFile f)
  1249. {
  1250. // JAVA EQUIVALENT
  1251. // pos = f.getPos();
  1252. //Get the JNIEnv* corresponding to current thread
  1253. JNIEnv* env = getJNIEnv();
  1254. if (env == NULL) {
  1255. errno = EINTERNAL;
  1256. return -1;
  1257. }
  1258. //Sanity check
  1259. if (!f || f->type == UNINITIALIZED) {
  1260. errno = EBADF;
  1261. return -1;
  1262. }
  1263. //Parameters
  1264. jobject jStream = f->file;
  1265. const char* interface = (f->type == INPUT) ?
  1266. HADOOP_ISTRM : HADOOP_OSTRM;
  1267. jvalue jVal;
  1268. jthrowable jthr = invokeMethod(env, &jVal, INSTANCE, jStream,
  1269. interface, "getPos", "()J");
  1270. if (jthr) {
  1271. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1272. "hdfsTell: %s#getPos",
  1273. ((f->type == INPUT) ? "FSDataInputStream" :
  1274. "FSDataOutputStream"));
  1275. return -1;
  1276. }
  1277. return jVal.j;
  1278. }
  1279. int hdfsFlush(hdfsFS fs, hdfsFile f)
  1280. {
  1281. // JAVA EQUIVALENT
  1282. // fos.flush();
  1283. //Get the JNIEnv* corresponding to current thread
  1284. JNIEnv* env = getJNIEnv();
  1285. if (env == NULL) {
  1286. errno = EINTERNAL;
  1287. return -1;
  1288. }
  1289. //Sanity check
  1290. if (!f || f->type != OUTPUT) {
  1291. errno = EBADF;
  1292. return -1;
  1293. }
  1294. jthrowable jthr = invokeMethod(env, NULL, INSTANCE, f->file,
  1295. HADOOP_OSTRM, "flush", "()V");
  1296. if (jthr) {
  1297. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1298. "hdfsFlush: FSDataInputStream#flush");
  1299. return -1;
  1300. }
  1301. return 0;
  1302. }
  1303. int hdfsHFlush(hdfsFS fs, hdfsFile f)
  1304. {
  1305. //Get the JNIEnv* corresponding to current thread
  1306. JNIEnv* env = getJNIEnv();
  1307. if (env == NULL) {
  1308. errno = EINTERNAL;
  1309. return -1;
  1310. }
  1311. //Sanity check
  1312. if (!f || f->type != OUTPUT) {
  1313. errno = EBADF;
  1314. return -1;
  1315. }
  1316. jobject jOutputStream = f->file;
  1317. jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  1318. HADOOP_OSTRM, "hflush", "()V");
  1319. if (jthr) {
  1320. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1321. "hdfsHFlush: FSDataOutputStream#hflush");
  1322. return -1;
  1323. }
  1324. return 0;
  1325. }
  1326. int hdfsHSync(hdfsFS fs, hdfsFile f)
  1327. {
  1328. //Get the JNIEnv* corresponding to current thread
  1329. JNIEnv* env = getJNIEnv();
  1330. if (env == NULL) {
  1331. errno = EINTERNAL;
  1332. return -1;
  1333. }
  1334. //Sanity check
  1335. if (!f || f->type != OUTPUT) {
  1336. errno = EBADF;
  1337. return -1;
  1338. }
  1339. jobject jOutputStream = f->file;
  1340. jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  1341. HADOOP_OSTRM, "hsync", "()V");
  1342. if (jthr) {
  1343. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1344. "hdfsHSync: FSDataOutputStream#hsync");
  1345. return -1;
  1346. }
  1347. return 0;
  1348. }
  1349. int hdfsAvailable(hdfsFS fs, hdfsFile f)
  1350. {
  1351. // JAVA EQUIVALENT
  1352. // fis.available();
  1353. //Get the JNIEnv* corresponding to current thread
  1354. JNIEnv* env = getJNIEnv();
  1355. if (env == NULL) {
  1356. errno = EINTERNAL;
  1357. return -1;
  1358. }
  1359. //Sanity check
  1360. if (!f || f->type != INPUT) {
  1361. errno = EBADF;
  1362. return -1;
  1363. }
  1364. //Parameters
  1365. jobject jInputStream = f->file;
  1366. jvalue jVal;
  1367. jthrowable jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
  1368. HADOOP_ISTRM, "available", "()I");
  1369. if (jthr) {
  1370. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1371. "hdfsAvailable: FSDataInputStream#available");
  1372. return -1;
  1373. }
  1374. return jVal.i;
  1375. }
  1376. static int hdfsCopyImpl(hdfsFS srcFS, const char* src, hdfsFS dstFS,
  1377. const char* dst, jboolean deleteSource)
  1378. {
  1379. //JAVA EQUIVALENT
  1380. // FileUtil#copy(srcFS, srcPath, dstFS, dstPath,
  1381. // deleteSource = false, conf)
  1382. //Get the JNIEnv* corresponding to current thread
  1383. JNIEnv* env = getJNIEnv();
  1384. if (env == NULL) {
  1385. errno = EINTERNAL;
  1386. return -1;
  1387. }
  1388. //Parameters
  1389. jobject jSrcFS = (jobject)srcFS;
  1390. jobject jDstFS = (jobject)dstFS;
  1391. jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL;
  1392. jthrowable jthr;
  1393. jvalue jVal;
  1394. int ret;
  1395. jthr = constructNewObjectOfPath(env, src, &jSrcPath);
  1396. if (jthr) {
  1397. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1398. "hdfsCopyImpl(src=%s): constructNewObjectOfPath", src);
  1399. goto done;
  1400. }
  1401. jthr = constructNewObjectOfPath(env, dst, &jDstPath);
  1402. if (jthr) {
  1403. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1404. "hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst);
  1405. goto done;
  1406. }
  1407. //Create the org.apache.hadoop.conf.Configuration object
  1408. jthr = constructNewObjectOfClass(env, &jConfiguration,
  1409. HADOOP_CONF, "()V");
  1410. if (jthr) {
  1411. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1412. "hdfsCopyImpl: Configuration constructor");
  1413. goto done;
  1414. }
  1415. //FileUtil#copy
  1416. jthr = invokeMethod(env, &jVal, STATIC,
  1417. NULL, "org/apache/hadoop/fs/FileUtil", "copy",
  1418. "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
  1419. "Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
  1420. "ZLorg/apache/hadoop/conf/Configuration;)Z",
  1421. jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource,
  1422. jConfiguration);
  1423. if (jthr) {
  1424. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1425. "hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): "
  1426. "FileUtil#copy", src, dst, deleteSource);
  1427. goto done;
  1428. }
  1429. if (!jVal.z) {
  1430. ret = EIO;
  1431. goto done;
  1432. }
  1433. ret = 0;
  1434. done:
  1435. destroyLocalReference(env, jConfiguration);
  1436. destroyLocalReference(env, jSrcPath);
  1437. destroyLocalReference(env, jDstPath);
  1438. if (ret) {
  1439. errno = ret;
  1440. return -1;
  1441. }
  1442. return 0;
  1443. }
  1444. int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
  1445. {
  1446. return hdfsCopyImpl(srcFS, src, dstFS, dst, 0);
  1447. }
  1448. int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
  1449. {
  1450. return hdfsCopyImpl(srcFS, src, dstFS, dst, 1);
  1451. }
  1452. int hdfsDelete(hdfsFS fs, const char* path, int recursive)
  1453. {
  1454. // JAVA EQUIVALENT:
  1455. // Path p = new Path(path);
  1456. // bool retval = fs.delete(p, recursive);
  1457. //Get the JNIEnv* corresponding to current thread
  1458. JNIEnv* env = getJNIEnv();
  1459. if (env == NULL) {
  1460. errno = EINTERNAL;
  1461. return -1;
  1462. }
  1463. jobject jFS = (jobject)fs;
  1464. jthrowable jthr;
  1465. jobject jPath;
  1466. jvalue jVal;
  1467. jthr = constructNewObjectOfPath(env, path, &jPath);
  1468. if (jthr) {
  1469. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1470. "hdfsDelete(path=%s): constructNewObjectOfPath", path);
  1471. return -1;
  1472. }
  1473. jboolean jRecursive = recursive ? JNI_TRUE : JNI_FALSE;
  1474. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  1475. "delete", "(Lorg/apache/hadoop/fs/Path;Z)Z",
  1476. jPath, jRecursive);
  1477. destroyLocalReference(env, jPath);
  1478. if (jthr) {
  1479. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1480. "hdfsDelete(path=%s, recursive=%d): "
  1481. "FileSystem#delete", path, recursive);
  1482. return -1;
  1483. }
  1484. if (!jVal.z) {
  1485. errno = EIO;
  1486. return -1;
  1487. }
  1488. return 0;
  1489. }
  1490. int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
  1491. {
  1492. // JAVA EQUIVALENT:
  1493. // Path old = new Path(oldPath);
  1494. // Path new = new Path(newPath);
  1495. // fs.rename(old, new);
  1496. //Get the JNIEnv* corresponding to current thread
  1497. JNIEnv* env = getJNIEnv();
  1498. if (env == NULL) {
  1499. errno = EINTERNAL;
  1500. return -1;
  1501. }
  1502. jobject jFS = (jobject)fs;
  1503. jthrowable jthr;
  1504. jobject jOldPath = NULL, jNewPath = NULL;
  1505. int ret = -1;
  1506. jvalue jVal;
  1507. jthr = constructNewObjectOfPath(env, oldPath, &jOldPath );
  1508. if (jthr) {
  1509. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1510. "hdfsRename: constructNewObjectOfPath(%s)", oldPath);
  1511. goto done;
  1512. }
  1513. jthr = constructNewObjectOfPath(env, newPath, &jNewPath);
  1514. if (jthr) {
  1515. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1516. "hdfsRename: constructNewObjectOfPath(%s)", newPath);
  1517. goto done;
  1518. }
  1519. // Rename the file
  1520. // TODO: use rename2 here? (See HDFS-3592)
  1521. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "rename",
  1522. JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_PATH), "Z"),
  1523. jOldPath, jNewPath);
  1524. if (jthr) {
  1525. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1526. "hdfsRename(oldPath=%s, newPath=%s): FileSystem#rename",
  1527. oldPath, newPath);
  1528. goto done;
  1529. }
  1530. if (!jVal.z) {
  1531. errno = EIO;
  1532. goto done;
  1533. }
  1534. ret = 0;
  1535. done:
  1536. destroyLocalReference(env, jOldPath);
  1537. destroyLocalReference(env, jNewPath);
  1538. return ret;
  1539. }
  1540. char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize)
  1541. {
  1542. // JAVA EQUIVALENT:
  1543. // Path p = fs.getWorkingDirectory();
  1544. // return p.toString()
  1545. //Get the JNIEnv* corresponding to current thread
  1546. JNIEnv* env = getJNIEnv();
  1547. if (env == NULL) {
  1548. errno = EINTERNAL;
  1549. return NULL;
  1550. }
  1551. jobject jPath = NULL;
  1552. jstring jPathString = NULL;
  1553. jobject jFS = (jobject)fs;
  1554. jvalue jVal;
  1555. jthrowable jthr;
  1556. int ret;
  1557. const char *jPathChars = NULL;
  1558. //FileSystem#getWorkingDirectory()
  1559. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  1560. HADOOP_FS, "getWorkingDirectory",
  1561. "()Lorg/apache/hadoop/fs/Path;");
  1562. if (jthr) {
  1563. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1564. "hdfsGetWorkingDirectory: FileSystem#getWorkingDirectory");
  1565. goto done;
  1566. }
  1567. jPath = jVal.l;
  1568. if (!jPath) {
  1569. fprintf(stderr, "hdfsGetWorkingDirectory: "
  1570. "FileSystem#getWorkingDirectory returned NULL");
  1571. ret = -EIO;
  1572. goto done;
  1573. }
  1574. //Path#toString()
  1575. jthr = invokeMethod(env, &jVal, INSTANCE, jPath,
  1576. "org/apache/hadoop/fs/Path", "toString",
  1577. "()Ljava/lang/String;");
  1578. if (jthr) {
  1579. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1580. "hdfsGetWorkingDirectory: Path#toString");
  1581. goto done;
  1582. }
  1583. jPathString = jVal.l;
  1584. jPathChars = (*env)->GetStringUTFChars(env, jPathString, NULL);
  1585. if (!jPathChars) {
  1586. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1587. "hdfsGetWorkingDirectory: GetStringUTFChars");
  1588. goto done;
  1589. }
  1590. //Copy to user-provided buffer
  1591. ret = snprintf(buffer, bufferSize, "%s", jPathChars);
  1592. if (ret >= bufferSize) {
  1593. ret = ENAMETOOLONG;
  1594. goto done;
  1595. }
  1596. ret = 0;
  1597. done:
  1598. if (jPathChars) {
  1599. (*env)->ReleaseStringUTFChars(env, jPathString, jPathChars);
  1600. }
  1601. destroyLocalReference(env, jPath);
  1602. destroyLocalReference(env, jPathString);
  1603. if (ret) {
  1604. errno = ret;
  1605. return NULL;
  1606. }
  1607. return buffer;
  1608. }
  1609. int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
  1610. {
  1611. // JAVA EQUIVALENT:
  1612. // fs.setWorkingDirectory(Path(path));
  1613. //Get the JNIEnv* corresponding to current thread
  1614. JNIEnv* env = getJNIEnv();
  1615. if (env == NULL) {
  1616. errno = EINTERNAL;
  1617. return -1;
  1618. }
  1619. jobject jFS = (jobject)fs;
  1620. jthrowable jthr;
  1621. jobject jPath;
  1622. //Create an object of org.apache.hadoop.fs.Path
  1623. jthr = constructNewObjectOfPath(env, path, &jPath);
  1624. if (jthr) {
  1625. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1626. "hdfsSetWorkingDirectory(%s): constructNewObjectOfPath",
  1627. path);
  1628. return -1;
  1629. }
  1630. //FileSystem#setWorkingDirectory()
  1631. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1632. "setWorkingDirectory",
  1633. "(Lorg/apache/hadoop/fs/Path;)V", jPath);
  1634. destroyLocalReference(env, jPath);
  1635. if (jthr) {
  1636. errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ILLEGAL_ARGUMENT,
  1637. "hdfsSetWorkingDirectory(%s): FileSystem#setWorkingDirectory",
  1638. path);
  1639. return -1;
  1640. }
  1641. return 0;
  1642. }
  1643. int hdfsCreateDirectory(hdfsFS fs, const char* path)
  1644. {
  1645. // JAVA EQUIVALENT:
  1646. // fs.mkdirs(new Path(path));
  1647. //Get the JNIEnv* corresponding to current thread
  1648. JNIEnv* env = getJNIEnv();
  1649. if (env == NULL) {
  1650. errno = EINTERNAL;
  1651. return -1;
  1652. }
  1653. jobject jFS = (jobject)fs;
  1654. jobject jPath;
  1655. jthrowable jthr;
  1656. //Create an object of org.apache.hadoop.fs.Path
  1657. jthr = constructNewObjectOfPath(env, path, &jPath);
  1658. if (jthr) {
  1659. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1660. "hdfsCreateDirectory(%s): constructNewObjectOfPath", path);
  1661. return -1;
  1662. }
  1663. //Create the directory
  1664. jvalue jVal;
  1665. jVal.z = 0;
  1666. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  1667. "mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z",
  1668. jPath);
  1669. destroyLocalReference(env, jPath);
  1670. if (jthr) {
  1671. errno = printExceptionAndFree(env, jthr,
  1672. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1673. NOPRINT_EXC_UNRESOLVED_LINK | NOPRINT_EXC_PARENT_NOT_DIRECTORY,
  1674. "hdfsCreateDirectory(%s): FileSystem#mkdirs", path);
  1675. return -1;
  1676. }
  1677. if (!jVal.z) {
  1678. // It's unclear under exactly which conditions FileSystem#mkdirs
  1679. // is supposed to return false (as opposed to throwing an exception.)
  1680. // It seems like the current code never actually returns false.
  1681. // So we're going to translate this to EIO, since there seems to be
  1682. // nothing more specific we can do with it.
  1683. errno = EIO;
  1684. return -1;
  1685. }
  1686. return 0;
  1687. }
  1688. int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
  1689. {
  1690. // JAVA EQUIVALENT:
  1691. // fs.setReplication(new Path(path), replication);
  1692. //Get the JNIEnv* corresponding to current thread
  1693. JNIEnv* env = getJNIEnv();
  1694. if (env == NULL) {
  1695. errno = EINTERNAL;
  1696. return -1;
  1697. }
  1698. jobject jFS = (jobject)fs;
  1699. jthrowable jthr;
  1700. //Create an object of org.apache.hadoop.fs.Path
  1701. jobject jPath;
  1702. jthr = constructNewObjectOfPath(env, path, &jPath);
  1703. if (jthr) {
  1704. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1705. "hdfsSetReplication(path=%s): constructNewObjectOfPath", path);
  1706. return -1;
  1707. }
  1708. //Create the directory
  1709. jvalue jVal;
  1710. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  1711. "setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z",
  1712. jPath, replication);
  1713. destroyLocalReference(env, jPath);
  1714. if (jthr) {
  1715. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1716. "hdfsSetReplication(path=%s, replication=%d): "
  1717. "FileSystem#setReplication", path, replication);
  1718. return -1;
  1719. }
  1720. if (!jVal.z) {
  1721. // setReplication returns false "if file does not exist or is a
  1722. // directory." So the nearest translation to that is ENOENT.
  1723. errno = ENOENT;
  1724. return -1;
  1725. }
  1726. return 0;
  1727. }
  1728. int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
  1729. {
  1730. // JAVA EQUIVALENT:
  1731. // fs.setOwner(path, owner, group)
  1732. //Get the JNIEnv* corresponding to current thread
  1733. JNIEnv* env = getJNIEnv();
  1734. if (env == NULL) {
  1735. errno = EINTERNAL;
  1736. return -1;
  1737. }
  1738. if (owner == NULL && group == NULL) {
  1739. return 0;
  1740. }
  1741. jobject jFS = (jobject)fs;
  1742. jobject jPath = NULL;
  1743. jstring jOwner = NULL, jGroup = NULL;
  1744. jthrowable jthr;
  1745. int ret;
  1746. jthr = constructNewObjectOfPath(env, path, &jPath);
  1747. if (jthr) {
  1748. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1749. "hdfsChown(path=%s): constructNewObjectOfPath", path);
  1750. goto done;
  1751. }
  1752. jthr = newJavaStr(env, owner, &jOwner);
  1753. if (jthr) {
  1754. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1755. "hdfsChown(path=%s): newJavaStr(%s)", path, owner);
  1756. goto done;
  1757. }
  1758. jthr = newJavaStr(env, group, &jGroup);
  1759. if (jthr) {
  1760. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1761. "hdfsChown(path=%s): newJavaStr(%s)", path, group);
  1762. goto done;
  1763. }
  1764. //Create the directory
  1765. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1766. "setOwner", JMETHOD3(JPARAM(HADOOP_PATH),
  1767. JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID),
  1768. jPath, jOwner, jGroup);
  1769. if (jthr) {
  1770. ret = printExceptionAndFree(env, jthr,
  1771. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1772. NOPRINT_EXC_UNRESOLVED_LINK,
  1773. "hdfsChown(path=%s, owner=%s, group=%s): "
  1774. "FileSystem#setOwner", path, owner, group);
  1775. goto done;
  1776. }
  1777. ret = 0;
  1778. done:
  1779. destroyLocalReference(env, jPath);
  1780. destroyLocalReference(env, jOwner);
  1781. destroyLocalReference(env, jGroup);
  1782. if (ret) {
  1783. errno = ret;
  1784. return -1;
  1785. }
  1786. return 0;
  1787. }
  1788. int hdfsChmod(hdfsFS fs, const char* path, short mode)
  1789. {
  1790. int ret;
  1791. // JAVA EQUIVALENT:
  1792. // fs.setPermission(path, FsPermission)
  1793. //Get the JNIEnv* corresponding to current thread
  1794. JNIEnv* env = getJNIEnv();
  1795. if (env == NULL) {
  1796. errno = EINTERNAL;
  1797. return -1;
  1798. }
  1799. jthrowable jthr;
  1800. jobject jPath = NULL, jPermObj = NULL;
  1801. jobject jFS = (jobject)fs;
  1802. // construct jPerm = FsPermission.createImmutable(short mode);
  1803. jshort jmode = mode;
  1804. jthr = constructNewObjectOfClass(env, &jPermObj,
  1805. HADOOP_FSPERM,"(S)V",jmode);
  1806. if (jthr) {
  1807. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1808. "constructNewObjectOfClass(%s)", HADOOP_FSPERM);
  1809. return -1;
  1810. }
  1811. //Create an object of org.apache.hadoop.fs.Path
  1812. jthr = constructNewObjectOfPath(env, path, &jPath);
  1813. if (jthr) {
  1814. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1815. "hdfsChmod(%s): constructNewObjectOfPath", path);
  1816. goto done;
  1817. }
  1818. //Create the directory
  1819. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1820. "setPermission",
  1821. JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSPERM), JAVA_VOID),
  1822. jPath, jPermObj);
  1823. if (jthr) {
  1824. ret = printExceptionAndFree(env, jthr,
  1825. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1826. NOPRINT_EXC_UNRESOLVED_LINK,
  1827. "hdfsChmod(%s): FileSystem#setPermission", path);
  1828. goto done;
  1829. }
  1830. ret = 0;
  1831. done:
  1832. destroyLocalReference(env, jPath);
  1833. destroyLocalReference(env, jPermObj);
  1834. if (ret) {
  1835. errno = ret;
  1836. return -1;
  1837. }
  1838. return 0;
  1839. }
  1840. int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
  1841. {
  1842. // JAVA EQUIVALENT:
  1843. // fs.setTimes(src, mtime, atime)
  1844. jthrowable jthr;
  1845. //Get the JNIEnv* corresponding to current thread
  1846. JNIEnv* env = getJNIEnv();
  1847. if (env == NULL) {
  1848. errno = EINTERNAL;
  1849. return -1;
  1850. }
  1851. jobject jFS = (jobject)fs;
  1852. //Create an object of org.apache.hadoop.fs.Path
  1853. jobject jPath;
  1854. jthr = constructNewObjectOfPath(env, path, &jPath);
  1855. if (jthr) {
  1856. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1857. "hdfsUtime(path=%s): constructNewObjectOfPath", path);
  1858. return -1;
  1859. }
  1860. const tTime NO_CHANGE = -1;
  1861. jlong jmtime = (mtime == NO_CHANGE) ? -1 : (mtime * (jlong)1000);
  1862. jlong jatime = (atime == NO_CHANGE) ? -1 : (atime * (jlong)1000);
  1863. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1864. "setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J", JAVA_VOID),
  1865. jPath, jmtime, jatime);
  1866. destroyLocalReference(env, jPath);
  1867. if (jthr) {
  1868. errno = printExceptionAndFree(env, jthr,
  1869. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1870. NOPRINT_EXC_UNRESOLVED_LINK,
  1871. "hdfsUtime(path=%s): FileSystem#setTimes", path);
  1872. return -1;
  1873. }
  1874. return 0;
  1875. }
  1876. /**
  1877. * Zero-copy options.
  1878. *
  1879. * We cache the EnumSet of ReadOptions which has to be passed into every
  1880. * readZero call, to avoid reconstructing it each time. This cache is cleared
  1881. * whenever an element changes.
  1882. */
  1883. struct hadoopRzOptions
  1884. {
  1885. JNIEnv *env;
  1886. int skipChecksums;
  1887. jobject byteBufferPool;
  1888. jobject cachedEnumSet;
  1889. };
  1890. struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
  1891. {
  1892. struct hadoopRzOptions *opts;
  1893. JNIEnv *env;
  1894. env = getJNIEnv();
  1895. if (!env) {
  1896. // Check to make sure the JNI environment is set up properly.
  1897. errno = EINTERNAL;
  1898. return NULL;
  1899. }
  1900. opts = calloc(1, sizeof(struct hadoopRzOptions));
  1901. if (!opts) {
  1902. errno = ENOMEM;
  1903. return NULL;
  1904. }
  1905. return opts;
  1906. }
  1907. static void hadoopRzOptionsClearCached(JNIEnv *env,
  1908. struct hadoopRzOptions *opts)
  1909. {
  1910. if (!opts->cachedEnumSet) {
  1911. return;
  1912. }
  1913. (*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
  1914. opts->cachedEnumSet = NULL;
  1915. }
  1916. int hadoopRzOptionsSetSkipChecksum(
  1917. struct hadoopRzOptions *opts, int skip)
  1918. {
  1919. JNIEnv *env;
  1920. env = getJNIEnv();
  1921. if (!env) {
  1922. errno = EINTERNAL;
  1923. return -1;
  1924. }
  1925. hadoopRzOptionsClearCached(env, opts);
  1926. opts->skipChecksums = !!skip;
  1927. return 0;
  1928. }
  1929. int hadoopRzOptionsSetByteBufferPool(
  1930. struct hadoopRzOptions *opts, const char *className)
  1931. {
  1932. JNIEnv *env;
  1933. jthrowable jthr;
  1934. jobject byteBufferPool = NULL;
  1935. env = getJNIEnv();
  1936. if (!env) {
  1937. errno = EINTERNAL;
  1938. return -1;
  1939. }
  1940. if (className) {
  1941. // Note: we don't have to call hadoopRzOptionsClearCached in this
  1942. // function, since the ByteBufferPool is passed separately from the
  1943. // EnumSet of ReadOptions.
  1944. jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
  1945. if (jthr) {
  1946. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1947. "hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
  1948. errno = EINVAL;
  1949. return -1;
  1950. }
  1951. }
  1952. if (opts->byteBufferPool) {
  1953. // Delete any previous ByteBufferPool we had.
  1954. (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
  1955. }
  1956. opts->byteBufferPool = byteBufferPool;
  1957. return 0;
  1958. }
  1959. void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
  1960. {
  1961. JNIEnv *env;
  1962. env = getJNIEnv();
  1963. if (!env) {
  1964. return;
  1965. }
  1966. hadoopRzOptionsClearCached(env, opts);
  1967. if (opts->byteBufferPool) {
  1968. (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
  1969. opts->byteBufferPool = NULL;
  1970. }
  1971. free(opts);
  1972. }
  1973. struct hadoopRzBuffer
  1974. {
  1975. jobject byteBuffer;
  1976. uint8_t *ptr;
  1977. int32_t length;
  1978. int direct;
  1979. };
  1980. static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
  1981. struct hadoopRzOptions *opts, jobject *enumSet)
  1982. {
  1983. jthrowable jthr = NULL;
  1984. jobject enumInst = NULL, enumSetObj = NULL;
  1985. jvalue jVal;
  1986. if (opts->cachedEnumSet) {
  1987. // If we cached the value, return it now.
  1988. *enumSet = opts->cachedEnumSet;
  1989. goto done;
  1990. }
  1991. if (opts->skipChecksums) {
  1992. jthr = fetchEnumInstance(env, READ_OPTION,
  1993. "SKIP_CHECKSUMS", &enumInst);
  1994. if (jthr) {
  1995. goto done;
  1996. }
  1997. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  1998. "java/util/EnumSet", "of",
  1999. "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
  2000. if (jthr) {
  2001. goto done;
  2002. }
  2003. enumSetObj = jVal.l;
  2004. } else {
  2005. jclass clazz = (*env)->FindClass(env, READ_OPTION);
  2006. if (!clazz) {
  2007. jthr = newRuntimeError(env, "failed "
  2008. "to find class for %s", READ_OPTION);
  2009. goto done;
  2010. }
  2011. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  2012. "java/util/EnumSet", "noneOf",
  2013. "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
  2014. enumSetObj = jVal.l;
  2015. }
  2016. // create global ref
  2017. opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
  2018. if (!opts->cachedEnumSet) {
  2019. jthr = getPendingExceptionAndClear(env);
  2020. goto done;
  2021. }
  2022. *enumSet = opts->cachedEnumSet;
  2023. jthr = NULL;
  2024. done:
  2025. (*env)->DeleteLocalRef(env, enumInst);
  2026. (*env)->DeleteLocalRef(env, enumSetObj);
  2027. return jthr;
  2028. }
  2029. static int hadoopReadZeroExtractBuffer(JNIEnv *env,
  2030. const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
  2031. {
  2032. int ret;
  2033. jthrowable jthr;
  2034. jvalue jVal;
  2035. uint8_t *directStart;
  2036. void *mallocBuf = NULL;
  2037. jint position;
  2038. jarray array = NULL;
  2039. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  2040. "java/nio/ByteBuffer", "remaining", "()I");
  2041. if (jthr) {
  2042. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2043. "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
  2044. goto done;
  2045. }
  2046. buffer->length = jVal.i;
  2047. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  2048. "java/nio/ByteBuffer", "position", "()I");
  2049. if (jthr) {
  2050. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2051. "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
  2052. goto done;
  2053. }
  2054. position = jVal.i;
  2055. directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
  2056. if (directStart) {
  2057. // Handle direct buffers.
  2058. buffer->ptr = directStart + position;
  2059. buffer->direct = 1;
  2060. ret = 0;
  2061. goto done;
  2062. }
  2063. // Handle indirect buffers.
  2064. // The JNI docs don't say that GetDirectBufferAddress throws any exceptions
  2065. // when it fails. However, they also don't clearly say that it doesn't. It
  2066. // seems safest to clear any pending exceptions here, to prevent problems on
  2067. // various JVMs.
  2068. (*env)->ExceptionClear(env);
  2069. if (!opts->byteBufferPool) {
  2070. fputs("hadoopReadZeroExtractBuffer: we read through the "
  2071. "zero-copy path, but failed to get the address of the buffer via "
  2072. "GetDirectBufferAddress. Please make sure your JVM supports "
  2073. "GetDirectBufferAddress.\n", stderr);
  2074. ret = ENOTSUP;
  2075. goto done;
  2076. }
  2077. // Get the backing array object of this buffer.
  2078. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  2079. "java/nio/ByteBuffer", "array", "()[B");
  2080. if (jthr) {
  2081. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2082. "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
  2083. goto done;
  2084. }
  2085. array = jVal.l;
  2086. if (!array) {
  2087. fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
  2088. stderr);
  2089. ret = EIO;
  2090. goto done;
  2091. }
  2092. mallocBuf = malloc(buffer->length);
  2093. if (!mallocBuf) {
  2094. fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
  2095. buffer->length);
  2096. ret = ENOMEM;
  2097. goto done;
  2098. }
  2099. (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
  2100. jthr = (*env)->ExceptionOccurred(env);
  2101. if (jthr) {
  2102. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2103. "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
  2104. goto done;
  2105. }
  2106. buffer->ptr = mallocBuf;
  2107. buffer->direct = 0;
  2108. ret = 0;
  2109. done:
  2110. free(mallocBuf);
  2111. (*env)->DeleteLocalRef(env, array);
  2112. return ret;
  2113. }
  2114. static int translateZCRException(JNIEnv *env, jthrowable exc)
  2115. {
  2116. int ret;
  2117. char *className = NULL;
  2118. jthrowable jthr = classNameOfObject(exc, env, &className);
  2119. if (jthr) {
  2120. fputs("hadoopReadZero: failed to get class name of "
  2121. "exception from read().\n", stderr);
  2122. destroyLocalReference(env, exc);
  2123. destroyLocalReference(env, jthr);
  2124. ret = EIO;
  2125. goto done;
  2126. }
  2127. if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
  2128. ret = EPROTONOSUPPORT;
  2129. goto done;
  2130. }
  2131. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2132. "hadoopZeroCopyRead: ZeroCopyCursor#read failed");
  2133. done:
  2134. free(className);
  2135. return ret;
  2136. }
  2137. struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
  2138. struct hadoopRzOptions *opts, int32_t maxLength)
  2139. {
  2140. JNIEnv *env;
  2141. jthrowable jthr = NULL;
  2142. jvalue jVal;
  2143. jobject enumSet = NULL, byteBuffer = NULL;
  2144. struct hadoopRzBuffer* buffer = NULL;
  2145. int ret;
  2146. env = getJNIEnv();
  2147. if (!env) {
  2148. errno = EINTERNAL;
  2149. return NULL;
  2150. }
  2151. if (file->type != INPUT) {
  2152. fputs("Cannot read from a non-InputStream object!\n", stderr);
  2153. ret = EINVAL;
  2154. goto done;
  2155. }
  2156. buffer = calloc(1, sizeof(struct hadoopRzBuffer));
  2157. if (!buffer) {
  2158. ret = ENOMEM;
  2159. goto done;
  2160. }
  2161. jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
  2162. if (jthr) {
  2163. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2164. "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
  2165. goto done;
  2166. }
  2167. jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
  2168. "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
  2169. "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
  2170. if (jthr) {
  2171. ret = translateZCRException(env, jthr);
  2172. goto done;
  2173. }
  2174. byteBuffer = jVal.l;
  2175. if (!byteBuffer) {
  2176. buffer->byteBuffer = NULL;
  2177. buffer->length = 0;
  2178. buffer->ptr = NULL;
  2179. } else {
  2180. buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
  2181. if (!buffer->byteBuffer) {
  2182. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2183. "hadoopReadZero: failed to create global ref to ByteBuffer");
  2184. goto done;
  2185. }
  2186. ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
  2187. if (ret) {
  2188. goto done;
  2189. }
  2190. }
  2191. ret = 0;
  2192. done:
  2193. (*env)->DeleteLocalRef(env, byteBuffer);
  2194. if (ret) {
  2195. if (buffer) {
  2196. if (buffer->byteBuffer) {
  2197. (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
  2198. }
  2199. free(buffer);
  2200. }
  2201. errno = ret;
  2202. return NULL;
  2203. } else {
  2204. errno = 0;
  2205. }
  2206. return buffer;
  2207. }
  2208. int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
  2209. {
  2210. return buffer->length;
  2211. }
  2212. const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
  2213. {
  2214. return buffer->ptr;
  2215. }
  2216. void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
  2217. {
  2218. jvalue jVal;
  2219. jthrowable jthr;
  2220. JNIEnv* env;
  2221. env = getJNIEnv();
  2222. if (env == NULL) {
  2223. errno = EINTERNAL;
  2224. return;
  2225. }
  2226. if (buffer->byteBuffer) {
  2227. jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
  2228. HADOOP_ISTRM, "releaseBuffer",
  2229. "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
  2230. if (jthr) {
  2231. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2232. "hadoopRzBufferFree: releaseBuffer failed: ");
  2233. // even on error, we have to delete the reference.
  2234. }
  2235. (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
  2236. }
  2237. if (!buffer->direct) {
  2238. free(buffer->ptr);
  2239. }
  2240. memset(buffer, 0, sizeof(*buffer));
  2241. free(buffer);
  2242. }
  2243. char***
  2244. hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
  2245. {
  2246. // JAVA EQUIVALENT:
  2247. // fs.getFileBlockLoctions(new Path(path), start, length);
  2248. jthrowable jthr;
  2249. jobject jPath = NULL;
  2250. jobject jFileStatus = NULL;
  2251. jvalue jFSVal, jVal;
  2252. jobjectArray jBlockLocations = NULL, jFileBlockHosts = NULL;
  2253. jstring jHost = NULL;
  2254. char*** blockHosts = NULL;
  2255. int i, j, ret;
  2256. jsize jNumFileBlocks = 0;
  2257. //Get the JNIEnv* corresponding to current thread
  2258. JNIEnv* env = getJNIEnv();
  2259. if (env == NULL) {
  2260. errno = EINTERNAL;
  2261. return NULL;
  2262. }
  2263. jobject jFS = (jobject)fs;
  2264. //Create an object of org.apache.hadoop.fs.Path
  2265. jthr = constructNewObjectOfPath(env, path, &jPath);
  2266. if (jthr) {
  2267. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2268. "hdfsGetHosts(path=%s): constructNewObjectOfPath", path);
  2269. goto done;
  2270. }
  2271. jthr = invokeMethod(env, &jFSVal, INSTANCE, jFS,
  2272. HADOOP_FS, "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)"
  2273. "Lorg/apache/hadoop/fs/FileStatus;", jPath);
  2274. if (jthr) {
  2275. ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND,
  2276. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2277. "FileSystem#getFileStatus", path, start, length);
  2278. destroyLocalReference(env, jPath);
  2279. goto done;
  2280. }
  2281. jFileStatus = jFSVal.l;
  2282. //org.apache.hadoop.fs.FileSystem#getFileBlockLocations
  2283. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  2284. HADOOP_FS, "getFileBlockLocations",
  2285. "(Lorg/apache/hadoop/fs/FileStatus;JJ)"
  2286. "[Lorg/apache/hadoop/fs/BlockLocation;",
  2287. jFileStatus, start, length);
  2288. if (jthr) {
  2289. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2290. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2291. "FileSystem#getFileBlockLocations", path, start, length);
  2292. goto done;
  2293. }
  2294. jBlockLocations = jVal.l;
  2295. //Figure out no of entries in jBlockLocations
  2296. //Allocate memory and add NULL at the end
  2297. jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations);
  2298. blockHosts = calloc(jNumFileBlocks + 1, sizeof(char**));
  2299. if (blockHosts == NULL) {
  2300. ret = ENOMEM;
  2301. goto done;
  2302. }
  2303. if (jNumFileBlocks == 0) {
  2304. ret = 0;
  2305. goto done;
  2306. }
  2307. //Now parse each block to get hostnames
  2308. for (i = 0; i < jNumFileBlocks; ++i) {
  2309. jobject jFileBlock =
  2310. (*env)->GetObjectArrayElement(env, jBlockLocations, i);
  2311. if (!jFileBlock) {
  2312. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2313. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2314. "GetObjectArrayElement(%d)", path, start, length, i);
  2315. goto done;
  2316. }
  2317. jthr = invokeMethod(env, &jVal, INSTANCE, jFileBlock, HADOOP_BLK_LOC,
  2318. "getHosts", "()[Ljava/lang/String;");
  2319. if (jthr) {
  2320. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2321. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2322. "BlockLocation#getHosts", path, start, length);
  2323. goto done;
  2324. }
  2325. jFileBlockHosts = jVal.l;
  2326. if (!jFileBlockHosts) {
  2327. fprintf(stderr,
  2328. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2329. "BlockLocation#getHosts returned NULL", path, start, length);
  2330. ret = EINTERNAL;
  2331. goto done;
  2332. }
  2333. //Figure out no of hosts in jFileBlockHosts, and allocate the memory
  2334. jsize jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts);
  2335. blockHosts[i] = calloc(jNumBlockHosts + 1, sizeof(char*));
  2336. if (!blockHosts[i]) {
  2337. ret = ENOMEM;
  2338. goto done;
  2339. }
  2340. //Now parse each hostname
  2341. const char *hostName;
  2342. for (j = 0; j < jNumBlockHosts; ++j) {
  2343. jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j);
  2344. if (!jHost) {
  2345. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2346. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"): "
  2347. "NewByteArray", path, start, length);
  2348. goto done;
  2349. }
  2350. hostName =
  2351. (const char*)((*env)->GetStringUTFChars(env, jHost, NULL));
  2352. if (!hostName) {
  2353. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2354. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64", "
  2355. "j=%d out of %d): GetStringUTFChars",
  2356. path, start, length, j, jNumBlockHosts);
  2357. goto done;
  2358. }
  2359. blockHosts[i][j] = strdup(hostName);
  2360. (*env)->ReleaseStringUTFChars(env, jHost, hostName);
  2361. if (!blockHosts[i][j]) {
  2362. ret = ENOMEM;
  2363. goto done;
  2364. }
  2365. destroyLocalReference(env, jHost);
  2366. jHost = NULL;
  2367. }
  2368. destroyLocalReference(env, jFileBlockHosts);
  2369. jFileBlockHosts = NULL;
  2370. }
  2371. ret = 0;
  2372. done:
  2373. destroyLocalReference(env, jPath);
  2374. destroyLocalReference(env, jFileStatus);
  2375. destroyLocalReference(env, jBlockLocations);
  2376. destroyLocalReference(env, jFileBlockHosts);
  2377. destroyLocalReference(env, jHost);
  2378. if (ret) {
  2379. if (blockHosts) {
  2380. hdfsFreeHosts(blockHosts);
  2381. }
  2382. return NULL;
  2383. }
  2384. return blockHosts;
  2385. }
  2386. void hdfsFreeHosts(char ***blockHosts)
  2387. {
  2388. int i, j;
  2389. for (i=0; blockHosts[i]; i++) {
  2390. for (j=0; blockHosts[i][j]; j++) {
  2391. free(blockHosts[i][j]);
  2392. }
  2393. free(blockHosts[i]);
  2394. }
  2395. free(blockHosts);
  2396. }
  2397. tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
  2398. {
  2399. // JAVA EQUIVALENT:
  2400. // fs.getDefaultBlockSize();
  2401. //Get the JNIEnv* corresponding to current thread
  2402. JNIEnv* env = getJNIEnv();
  2403. if (env == NULL) {
  2404. errno = EINTERNAL;
  2405. return -1;
  2406. }
  2407. jobject jFS = (jobject)fs;
  2408. //FileSystem#getDefaultBlockSize()
  2409. jvalue jVal;
  2410. jthrowable jthr;
  2411. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2412. "getDefaultBlockSize", "()J");
  2413. if (jthr) {
  2414. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2415. "hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize");
  2416. return -1;
  2417. }
  2418. return jVal.j;
  2419. }
  2420. tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path)
  2421. {
  2422. // JAVA EQUIVALENT:
  2423. // fs.getDefaultBlockSize(path);
  2424. jthrowable jthr;
  2425. jobject jFS = (jobject)fs;
  2426. jobject jPath;
  2427. tOffset blockSize;
  2428. JNIEnv* env = getJNIEnv();
  2429. if (env == NULL) {
  2430. errno = EINTERNAL;
  2431. return -1;
  2432. }
  2433. jthr = constructNewObjectOfPath(env, path, &jPath);
  2434. if (jthr) {
  2435. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2436. "hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath",
  2437. path);
  2438. return -1;
  2439. }
  2440. jthr = getDefaultBlockSize(env, jFS, jPath, &blockSize);
  2441. (*env)->DeleteLocalRef(env, jPath);
  2442. if (jthr) {
  2443. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2444. "hdfsGetDefaultBlockSize(path=%s): "
  2445. "FileSystem#getDefaultBlockSize", path);
  2446. return -1;
  2447. }
  2448. return blockSize;
  2449. }
  2450. tOffset hdfsGetCapacity(hdfsFS fs)
  2451. {
  2452. // JAVA EQUIVALENT:
  2453. // FsStatus fss = fs.getStatus();
  2454. // return Fss.getCapacity();
  2455. //Get the JNIEnv* corresponding to current thread
  2456. JNIEnv* env = getJNIEnv();
  2457. if (env == NULL) {
  2458. errno = EINTERNAL;
  2459. return -1;
  2460. }
  2461. jobject jFS = (jobject)fs;
  2462. //FileSystem#getStatus
  2463. jvalue jVal;
  2464. jthrowable jthr;
  2465. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2466. "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
  2467. if (jthr) {
  2468. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2469. "hdfsGetCapacity: FileSystem#getStatus");
  2470. return -1;
  2471. }
  2472. jobject fss = (jobject)jVal.l;
  2473. jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS,
  2474. "getCapacity", "()J");
  2475. destroyLocalReference(env, fss);
  2476. if (jthr) {
  2477. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2478. "hdfsGetCapacity: FsStatus#getCapacity");
  2479. return -1;
  2480. }
  2481. return jVal.j;
  2482. }
  2483. tOffset hdfsGetUsed(hdfsFS fs)
  2484. {
  2485. // JAVA EQUIVALENT:
  2486. // FsStatus fss = fs.getStatus();
  2487. // return Fss.getUsed();
  2488. //Get the JNIEnv* corresponding to current thread
  2489. JNIEnv* env = getJNIEnv();
  2490. if (env == NULL) {
  2491. errno = EINTERNAL;
  2492. return -1;
  2493. }
  2494. jobject jFS = (jobject)fs;
  2495. //FileSystem#getStatus
  2496. jvalue jVal;
  2497. jthrowable jthr;
  2498. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2499. "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
  2500. if (jthr) {
  2501. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2502. "hdfsGetUsed: FileSystem#getStatus");
  2503. return -1;
  2504. }
  2505. jobject fss = (jobject)jVal.l;
  2506. jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS,
  2507. "getUsed", "()J");
  2508. destroyLocalReference(env, fss);
  2509. if (jthr) {
  2510. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2511. "hdfsGetUsed: FsStatus#getUsed");
  2512. return -1;
  2513. }
  2514. return jVal.j;
  2515. }
  2516. static jthrowable
  2517. getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo)
  2518. {
  2519. jvalue jVal;
  2520. jthrowable jthr;
  2521. jobject jPath = NULL;
  2522. jstring jPathName = NULL;
  2523. jstring jUserName = NULL;
  2524. jstring jGroupName = NULL;
  2525. jobject jPermission = NULL;
  2526. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2527. HADOOP_STAT, "isDir", "()Z");
  2528. if (jthr)
  2529. goto done;
  2530. fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile;
  2531. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2532. HADOOP_STAT, "getReplication", "()S");
  2533. if (jthr)
  2534. goto done;
  2535. fileInfo->mReplication = jVal.s;
  2536. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2537. HADOOP_STAT, "getBlockSize", "()J");
  2538. if (jthr)
  2539. goto done;
  2540. fileInfo->mBlockSize = jVal.j;
  2541. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2542. HADOOP_STAT, "getModificationTime", "()J");
  2543. if (jthr)
  2544. goto done;
  2545. fileInfo->mLastMod = jVal.j / 1000;
  2546. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2547. HADOOP_STAT, "getAccessTime", "()J");
  2548. if (jthr)
  2549. goto done;
  2550. fileInfo->mLastAccess = (tTime) (jVal.j / 1000);
  2551. if (fileInfo->mKind == kObjectKindFile) {
  2552. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2553. HADOOP_STAT, "getLen", "()J");
  2554. if (jthr)
  2555. goto done;
  2556. fileInfo->mSize = jVal.j;
  2557. }
  2558. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2559. "getPath", "()Lorg/apache/hadoop/fs/Path;");
  2560. if (jthr)
  2561. goto done;
  2562. jPath = jVal.l;
  2563. if (jPath == NULL) {
  2564. jthr = newRuntimeError(env, "org.apache.hadoop.fs.FileStatus#"
  2565. "getPath returned NULL!");
  2566. goto done;
  2567. }
  2568. jthr = invokeMethod(env, &jVal, INSTANCE, jPath, HADOOP_PATH,
  2569. "toString", "()Ljava/lang/String;");
  2570. if (jthr)
  2571. goto done;
  2572. jPathName = jVal.l;
  2573. const char *cPathName =
  2574. (const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL));
  2575. if (!cPathName) {
  2576. jthr = getPendingExceptionAndClear(env);
  2577. goto done;
  2578. }
  2579. fileInfo->mName = strdup(cPathName);
  2580. (*env)->ReleaseStringUTFChars(env, jPathName, cPathName);
  2581. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2582. "getOwner", "()Ljava/lang/String;");
  2583. if (jthr)
  2584. goto done;
  2585. jUserName = jVal.l;
  2586. const char* cUserName =
  2587. (const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL));
  2588. if (!cUserName) {
  2589. jthr = getPendingExceptionAndClear(env);
  2590. goto done;
  2591. }
  2592. fileInfo->mOwner = strdup(cUserName);
  2593. (*env)->ReleaseStringUTFChars(env, jUserName, cUserName);
  2594. const char* cGroupName;
  2595. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2596. "getGroup", "()Ljava/lang/String;");
  2597. if (jthr)
  2598. goto done;
  2599. jGroupName = jVal.l;
  2600. cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL));
  2601. if (!cGroupName) {
  2602. jthr = getPendingExceptionAndClear(env);
  2603. goto done;
  2604. }
  2605. fileInfo->mGroup = strdup(cGroupName);
  2606. (*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName);
  2607. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2608. "getPermission",
  2609. "()Lorg/apache/hadoop/fs/permission/FsPermission;");
  2610. if (jthr)
  2611. goto done;
  2612. if (jVal.l == NULL) {
  2613. jthr = newRuntimeError(env, "%s#getPermission returned NULL!",
  2614. HADOOP_STAT);
  2615. goto done;
  2616. }
  2617. jPermission = jVal.l;
  2618. jthr = invokeMethod(env, &jVal, INSTANCE, jPermission, HADOOP_FSPERM,
  2619. "toShort", "()S");
  2620. if (jthr)
  2621. goto done;
  2622. fileInfo->mPermissions = jVal.s;
  2623. jthr = NULL;
  2624. done:
  2625. if (jthr)
  2626. hdfsFreeFileInfoEntry(fileInfo);
  2627. destroyLocalReference(env, jPath);
  2628. destroyLocalReference(env, jPathName);
  2629. destroyLocalReference(env, jUserName);
  2630. destroyLocalReference(env, jGroupName);
  2631. destroyLocalReference(env, jPermission);
  2632. destroyLocalReference(env, jPath);
  2633. return jthr;
  2634. }
  2635. static jthrowable
  2636. getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo **fileInfo)
  2637. {
  2638. // JAVA EQUIVALENT:
  2639. // fs.isDirectory(f)
  2640. // fs.getModificationTime()
  2641. // fs.getAccessTime()
  2642. // fs.getLength(f)
  2643. // f.getPath()
  2644. // f.getOwner()
  2645. // f.getGroup()
  2646. // f.getPermission().toShort()
  2647. jobject jStat;
  2648. jvalue jVal;
  2649. jthrowable jthr;
  2650. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2651. "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"),
  2652. jPath);
  2653. if (jthr)
  2654. return jthr;
  2655. if (jVal.z == 0) {
  2656. *fileInfo = NULL;
  2657. return NULL;
  2658. }
  2659. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  2660. HADOOP_FS, "getFileStatus",
  2661. JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_STAT)), jPath);
  2662. if (jthr)
  2663. return jthr;
  2664. jStat = jVal.l;
  2665. *fileInfo = calloc(1, sizeof(hdfsFileInfo));
  2666. if (!*fileInfo) {
  2667. destroyLocalReference(env, jStat);
  2668. return newRuntimeError(env, "getFileInfo: OOM allocating hdfsFileInfo");
  2669. }
  2670. jthr = getFileInfoFromStat(env, jStat, *fileInfo);
  2671. destroyLocalReference(env, jStat);
  2672. return jthr;
  2673. }
  2674. hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
  2675. {
  2676. // JAVA EQUIVALENT:
  2677. // Path p(path);
  2678. // Path []pathList = fs.listPaths(p)
  2679. // foreach path in pathList
  2680. // getFileInfo(path)
  2681. jthrowable jthr;
  2682. jobject jPath = NULL;
  2683. hdfsFileInfo *pathList = NULL;
  2684. jobjectArray jPathList = NULL;
  2685. jvalue jVal;
  2686. jsize jPathListSize = 0;
  2687. int ret;
  2688. //Get the JNIEnv* corresponding to current thread
  2689. JNIEnv* env = getJNIEnv();
  2690. if (env == NULL) {
  2691. errno = EINTERNAL;
  2692. return NULL;
  2693. }
  2694. jobject jFS = (jobject)fs;
  2695. //Create an object of org.apache.hadoop.fs.Path
  2696. jthr = constructNewObjectOfPath(env, path, &jPath);
  2697. if (jthr) {
  2698. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2699. "hdfsListDirectory(%s): constructNewObjectOfPath", path);
  2700. goto done;
  2701. }
  2702. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_DFS, "listStatus",
  2703. JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_STAT)),
  2704. jPath);
  2705. if (jthr) {
  2706. ret = printExceptionAndFree(env, jthr,
  2707. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2708. NOPRINT_EXC_UNRESOLVED_LINK,
  2709. "hdfsListDirectory(%s): FileSystem#listStatus", path);
  2710. goto done;
  2711. }
  2712. jPathList = jVal.l;
  2713. //Figure out the number of entries in that directory
  2714. jPathListSize = (*env)->GetArrayLength(env, jPathList);
  2715. if (jPathListSize == 0) {
  2716. ret = 0;
  2717. goto done;
  2718. }
  2719. //Allocate memory
  2720. pathList = calloc(jPathListSize, sizeof(hdfsFileInfo));
  2721. if (pathList == NULL) {
  2722. ret = ENOMEM;
  2723. goto done;
  2724. }
  2725. //Save path information in pathList
  2726. jsize i;
  2727. jobject tmpStat;
  2728. for (i=0; i < jPathListSize; ++i) {
  2729. tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i);
  2730. if (!tmpStat) {
  2731. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2732. "hdfsListDirectory(%s): GetObjectArrayElement(%d out of %d)",
  2733. path, i, jPathListSize);
  2734. goto done;
  2735. }
  2736. jthr = getFileInfoFromStat(env, tmpStat, &pathList[i]);
  2737. destroyLocalReference(env, tmpStat);
  2738. if (jthr) {
  2739. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2740. "hdfsListDirectory(%s): getFileInfoFromStat(%d out of %d)",
  2741. path, i, jPathListSize);
  2742. goto done;
  2743. }
  2744. }
  2745. ret = 0;
  2746. done:
  2747. destroyLocalReference(env, jPath);
  2748. destroyLocalReference(env, jPathList);
  2749. if (ret) {
  2750. hdfsFreeFileInfo(pathList, jPathListSize);
  2751. errno = ret;
  2752. return NULL;
  2753. }
  2754. *numEntries = jPathListSize;
  2755. return pathList;
  2756. }
  2757. hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
  2758. {
  2759. // JAVA EQUIVALENT:
  2760. // File f(path);
  2761. // fs.isDirectory(f)
  2762. // fs.lastModified() ??
  2763. // fs.getLength(f)
  2764. // f.getPath()
  2765. //Get the JNIEnv* corresponding to current thread
  2766. JNIEnv* env = getJNIEnv();
  2767. if (env == NULL) {
  2768. errno = EINTERNAL;
  2769. return NULL;
  2770. }
  2771. jobject jFS = (jobject)fs;
  2772. //Create an object of org.apache.hadoop.fs.Path
  2773. jobject jPath;
  2774. jthrowable jthr = constructNewObjectOfPath(env, path, &jPath);
  2775. if (jthr) {
  2776. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2777. "hdfsGetPathInfo(%s): constructNewObjectOfPath", path);
  2778. return NULL;
  2779. }
  2780. hdfsFileInfo *fileInfo;
  2781. jthr = getFileInfo(env, jFS, jPath, &fileInfo);
  2782. destroyLocalReference(env, jPath);
  2783. if (jthr) {
  2784. errno = printExceptionAndFree(env, jthr,
  2785. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2786. NOPRINT_EXC_UNRESOLVED_LINK,
  2787. "hdfsGetPathInfo(%s): getFileInfo", path);
  2788. return NULL;
  2789. }
  2790. if (!fileInfo) {
  2791. errno = ENOENT;
  2792. return NULL;
  2793. }
  2794. return fileInfo;
  2795. }
  2796. static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo)
  2797. {
  2798. free(hdfsFileInfo->mName);
  2799. free(hdfsFileInfo->mOwner);
  2800. free(hdfsFileInfo->mGroup);
  2801. memset(hdfsFileInfo, 0, sizeof(hdfsFileInfo));
  2802. }
  2803. void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
  2804. {
  2805. //Free the mName, mOwner, and mGroup
  2806. int i;
  2807. for (i=0; i < numEntries; ++i) {
  2808. hdfsFreeFileInfoEntry(hdfsFileInfo + i);
  2809. }
  2810. //Free entire block
  2811. free(hdfsFileInfo);
  2812. }
  2813. /**
  2814. * vim: ts=4: sw=4: et:
  2815. */