1
0

hdfs.c 95 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "exception.h"
  19. #include "hdfs.h"
  20. #include "jni_helper.h"
  21. #include "platform.h"
  22. #include <fcntl.h>
  23. #include <inttypes.h>
  24. #include <stdio.h>
  25. #include <string.h>
  26. /* Some frequently used Java paths */
  27. #define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
  28. #define HADOOP_PATH "org/apache/hadoop/fs/Path"
  29. #define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem"
  30. #define HADOOP_FS "org/apache/hadoop/fs/FileSystem"
  31. #define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus"
  32. #define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation"
  33. #define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem"
  34. #define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream"
  35. #define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
  36. #define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
  37. #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
  38. #define JAVA_NET_ISA "java/net/InetSocketAddress"
  39. #define JAVA_NET_URI "java/net/URI"
  40. #define JAVA_STRING "java/lang/String"
  41. #define READ_OPTION "org/apache/hadoop/fs/ReadOption"
  42. #define JAVA_VOID "V"
  43. /* Macros for constructing method signatures */
  44. #define JPARAM(X) "L" X ";"
  45. #define JARRPARAM(X) "[L" X ";"
  46. #define JMETHOD1(X, R) "(" X ")" R
  47. #define JMETHOD2(X, Y, R) "(" X Y ")" R
  48. #define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
  49. #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
  50. // Bit fields for hdfsFile_internal flags
  51. #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
  52. tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
  53. static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
  54. /**
  55. * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
  56. */
  57. enum hdfsStreamType
  58. {
  59. HDFS_STREAM_UNINITIALIZED = 0,
  60. HDFS_STREAM_INPUT = 1,
  61. HDFS_STREAM_OUTPUT = 2,
  62. };
  63. /**
  64. * The 'file-handle' to a file in hdfs.
  65. */
  66. struct hdfsFile_internal {
  67. void* file;
  68. enum hdfsStreamType type;
  69. int flags;
  70. };
  71. #define HDFS_EXTENDED_FILE_INFO_ENCRYPTED 0x1
  72. /**
  73. * Extended file information.
  74. */
  75. struct hdfsExtendedFileInfo {
  76. int flags;
  77. };
  78. int hdfsFileIsOpenForRead(hdfsFile file)
  79. {
  80. return (file->type == HDFS_STREAM_INPUT);
  81. }
  82. int hdfsFileGetReadStatistics(hdfsFile file,
  83. struct hdfsReadStatistics **stats)
  84. {
  85. jthrowable jthr;
  86. jobject readStats = NULL;
  87. jvalue jVal;
  88. struct hdfsReadStatistics *s = NULL;
  89. int ret;
  90. JNIEnv* env = getJNIEnv();
  91. if (env == NULL) {
  92. errno = EINTERNAL;
  93. return -1;
  94. }
  95. if (file->type != HDFS_STREAM_INPUT) {
  96. ret = EINVAL;
  97. goto done;
  98. }
  99. jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
  100. "org/apache/hadoop/hdfs/client/HdfsDataInputStream",
  101. "getReadStatistics",
  102. "()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;");
  103. if (jthr) {
  104. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  105. "hdfsFileGetReadStatistics: getReadStatistics failed");
  106. goto done;
  107. }
  108. readStats = jVal.l;
  109. s = malloc(sizeof(struct hdfsReadStatistics));
  110. if (!s) {
  111. ret = ENOMEM;
  112. goto done;
  113. }
  114. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  115. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  116. "getTotalBytesRead", "()J");
  117. if (jthr) {
  118. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  119. "hdfsFileGetReadStatistics: getTotalBytesRead failed");
  120. goto done;
  121. }
  122. s->totalBytesRead = jVal.j;
  123. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  124. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  125. "getTotalLocalBytesRead", "()J");
  126. if (jthr) {
  127. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  128. "hdfsFileGetReadStatistics: getTotalLocalBytesRead failed");
  129. goto done;
  130. }
  131. s->totalLocalBytesRead = jVal.j;
  132. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  133. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  134. "getTotalShortCircuitBytesRead", "()J");
  135. if (jthr) {
  136. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  137. "hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed");
  138. goto done;
  139. }
  140. s->totalShortCircuitBytesRead = jVal.j;
  141. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  142. "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
  143. "getTotalZeroCopyBytesRead", "()J");
  144. if (jthr) {
  145. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  146. "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
  147. goto done;
  148. }
  149. s->totalZeroCopyBytesRead = jVal.j;
  150. *stats = s;
  151. s = NULL;
  152. ret = 0;
  153. done:
  154. destroyLocalReference(env, readStats);
  155. free(s);
  156. if (ret) {
  157. errno = ret;
  158. return -1;
  159. }
  160. return 0;
  161. }
  162. int64_t hdfsReadStatisticsGetRemoteBytesRead(
  163. const struct hdfsReadStatistics *stats)
  164. {
  165. return stats->totalBytesRead - stats->totalLocalBytesRead;
  166. }
  167. int hdfsFileClearReadStatistics(hdfsFile file)
  168. {
  169. jthrowable jthr;
  170. int ret;
  171. JNIEnv* env = getJNIEnv();
  172. if (env == NULL) {
  173. errno = EINTERNAL;
  174. return EINTERNAL;
  175. }
  176. if (file->type != HDFS_STREAM_INPUT) {
  177. ret = EINVAL;
  178. goto done;
  179. }
  180. jthr = invokeMethod(env, NULL, INSTANCE, file->file,
  181. "org/apache/hadoop/hdfs/client/HdfsDataInputStream",
  182. "clearReadStatistics", "()V");
  183. if (jthr) {
  184. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  185. "hdfsFileClearReadStatistics: clearReadStatistics failed");
  186. goto done;
  187. }
  188. ret = 0;
  189. done:
  190. if (ret) {
  191. errno = ret;
  192. return ret;
  193. }
  194. return 0;
  195. }
  196. void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
  197. {
  198. free(stats);
  199. }
  200. int hdfsFileIsOpenForWrite(hdfsFile file)
  201. {
  202. return (file->type == HDFS_STREAM_OUTPUT);
  203. }
  204. int hdfsFileUsesDirectRead(hdfsFile file)
  205. {
  206. return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ);
  207. }
  208. void hdfsFileDisableDirectRead(hdfsFile file)
  209. {
  210. file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
  211. }
  212. int hdfsDisableDomainSocketSecurity(void)
  213. {
  214. jthrowable jthr;
  215. JNIEnv* env = getJNIEnv();
  216. if (env == NULL) {
  217. errno = EINTERNAL;
  218. return -1;
  219. }
  220. jthr = invokeMethod(env, NULL, STATIC, NULL,
  221. "org/apache/hadoop/net/unix/DomainSocket",
  222. "disableBindPathValidation", "()V");
  223. if (jthr) {
  224. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  225. "DomainSocket#disableBindPathValidation");
  226. return -1;
  227. }
  228. return 0;
  229. }
  230. /**
  231. * hdfsJniEnv: A wrapper struct to be used as 'value'
  232. * while saving thread -> JNIEnv* mappings
  233. */
  234. typedef struct
  235. {
  236. JNIEnv* env;
  237. } hdfsJniEnv;
  238. /**
  239. * Helper function to create a org.apache.hadoop.fs.Path object.
  240. * @param env: The JNIEnv pointer.
  241. * @param path: The file-path for which to construct org.apache.hadoop.fs.Path
  242. * object.
  243. * @return Returns a jobject on success and NULL on error.
  244. */
  245. static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
  246. jobject *out)
  247. {
  248. jthrowable jthr;
  249. jstring jPathString;
  250. jobject jPath;
  251. //Construct a java.lang.String object
  252. jthr = newJavaStr(env, path, &jPathString);
  253. if (jthr)
  254. return jthr;
  255. //Construct the org.apache.hadoop.fs.Path object
  256. jthr = constructNewObjectOfClass(env, &jPath, "org/apache/hadoop/fs/Path",
  257. "(Ljava/lang/String;)V", jPathString);
  258. destroyLocalReference(env, jPathString);
  259. if (jthr)
  260. return jthr;
  261. *out = jPath;
  262. return NULL;
  263. }
  264. static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
  265. const char *key, char **val)
  266. {
  267. jthrowable jthr;
  268. jvalue jVal;
  269. jstring jkey = NULL, jRet = NULL;
  270. jthr = newJavaStr(env, key, &jkey);
  271. if (jthr)
  272. goto done;
  273. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  274. HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
  275. JPARAM(JAVA_STRING)), jkey);
  276. if (jthr)
  277. goto done;
  278. jRet = jVal.l;
  279. jthr = newCStr(env, jRet, val);
  280. done:
  281. destroyLocalReference(env, jkey);
  282. destroyLocalReference(env, jRet);
  283. return jthr;
  284. }
  285. int hdfsConfGetStr(const char *key, char **val)
  286. {
  287. JNIEnv *env;
  288. int ret;
  289. jthrowable jthr;
  290. jobject jConfiguration = NULL;
  291. env = getJNIEnv();
  292. if (env == NULL) {
  293. ret = EINTERNAL;
  294. goto done;
  295. }
  296. jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
  297. if (jthr) {
  298. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  299. "hdfsConfGetStr(%s): new Configuration", key);
  300. goto done;
  301. }
  302. jthr = hadoopConfGetStr(env, jConfiguration, key, val);
  303. if (jthr) {
  304. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  305. "hdfsConfGetStr(%s): hadoopConfGetStr", key);
  306. goto done;
  307. }
  308. ret = 0;
  309. done:
  310. destroyLocalReference(env, jConfiguration);
  311. if (ret)
  312. errno = ret;
  313. return ret;
  314. }
  315. void hdfsConfStrFree(char *val)
  316. {
  317. free(val);
  318. }
  319. static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
  320. const char *key, int32_t *val)
  321. {
  322. jthrowable jthr = NULL;
  323. jvalue jVal;
  324. jstring jkey = NULL;
  325. jthr = newJavaStr(env, key, &jkey);
  326. if (jthr)
  327. return jthr;
  328. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  329. HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"),
  330. jkey, (jint)(*val));
  331. destroyLocalReference(env, jkey);
  332. if (jthr)
  333. return jthr;
  334. *val = jVal.i;
  335. return NULL;
  336. }
  337. int hdfsConfGetInt(const char *key, int32_t *val)
  338. {
  339. JNIEnv *env;
  340. int ret;
  341. jobject jConfiguration = NULL;
  342. jthrowable jthr;
  343. env = getJNIEnv();
  344. if (env == NULL) {
  345. ret = EINTERNAL;
  346. goto done;
  347. }
  348. jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
  349. if (jthr) {
  350. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  351. "hdfsConfGetInt(%s): new Configuration", key);
  352. goto done;
  353. }
  354. jthr = hadoopConfGetInt(env, jConfiguration, key, val);
  355. if (jthr) {
  356. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  357. "hdfsConfGetInt(%s): hadoopConfGetInt", key);
  358. goto done;
  359. }
  360. ret = 0;
  361. done:
  362. destroyLocalReference(env, jConfiguration);
  363. if (ret)
  364. errno = ret;
  365. return ret;
  366. }
  367. struct hdfsBuilderConfOpt {
  368. struct hdfsBuilderConfOpt *next;
  369. const char *key;
  370. const char *val;
  371. };
  372. struct hdfsBuilder {
  373. int forceNewInstance;
  374. const char *nn;
  375. tPort port;
  376. const char *kerbTicketCachePath;
  377. const char *userName;
  378. struct hdfsBuilderConfOpt *opts;
  379. };
  380. struct hdfsBuilder *hdfsNewBuilder(void)
  381. {
  382. struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
  383. if (!bld) {
  384. errno = ENOMEM;
  385. return NULL;
  386. }
  387. return bld;
  388. }
  389. int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
  390. const char *val)
  391. {
  392. struct hdfsBuilderConfOpt *opt, *next;
  393. opt = calloc(1, sizeof(struct hdfsBuilderConfOpt));
  394. if (!opt)
  395. return -ENOMEM;
  396. next = bld->opts;
  397. bld->opts = opt;
  398. opt->next = next;
  399. opt->key = key;
  400. opt->val = val;
  401. return 0;
  402. }
  403. void hdfsFreeBuilder(struct hdfsBuilder *bld)
  404. {
  405. struct hdfsBuilderConfOpt *cur, *next;
  406. cur = bld->opts;
  407. for (cur = bld->opts; cur; ) {
  408. next = cur->next;
  409. free(cur);
  410. cur = next;
  411. }
  412. free(bld);
  413. }
  414. void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
  415. {
  416. bld->forceNewInstance = 1;
  417. }
  418. void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
  419. {
  420. bld->nn = nn;
  421. }
  422. void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
  423. {
  424. bld->port = port;
  425. }
  426. void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
  427. {
  428. bld->userName = userName;
  429. }
  430. void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
  431. const char *kerbTicketCachePath)
  432. {
  433. bld->kerbTicketCachePath = kerbTicketCachePath;
  434. }
  435. hdfsFS hdfsConnect(const char *host, tPort port)
  436. {
  437. struct hdfsBuilder *bld = hdfsNewBuilder();
  438. if (!bld)
  439. return NULL;
  440. hdfsBuilderSetNameNode(bld, host);
  441. hdfsBuilderSetNameNodePort(bld, port);
  442. return hdfsBuilderConnect(bld);
  443. }
  444. /** Always return a new FileSystem handle */
  445. hdfsFS hdfsConnectNewInstance(const char *host, tPort port)
  446. {
  447. struct hdfsBuilder *bld = hdfsNewBuilder();
  448. if (!bld)
  449. return NULL;
  450. hdfsBuilderSetNameNode(bld, host);
  451. hdfsBuilderSetNameNodePort(bld, port);
  452. hdfsBuilderSetForceNewInstance(bld);
  453. return hdfsBuilderConnect(bld);
  454. }
  455. hdfsFS hdfsConnectAsUser(const char *host, tPort port, const char *user)
  456. {
  457. struct hdfsBuilder *bld = hdfsNewBuilder();
  458. if (!bld)
  459. return NULL;
  460. hdfsBuilderSetNameNode(bld, host);
  461. hdfsBuilderSetNameNodePort(bld, port);
  462. hdfsBuilderSetUserName(bld, user);
  463. return hdfsBuilderConnect(bld);
  464. }
  465. /** Always return a new FileSystem handle */
  466. hdfsFS hdfsConnectAsUserNewInstance(const char *host, tPort port,
  467. const char *user)
  468. {
  469. struct hdfsBuilder *bld = hdfsNewBuilder();
  470. if (!bld)
  471. return NULL;
  472. hdfsBuilderSetNameNode(bld, host);
  473. hdfsBuilderSetNameNodePort(bld, port);
  474. hdfsBuilderSetForceNewInstance(bld);
  475. hdfsBuilderSetUserName(bld, user);
  476. return hdfsBuilderConnect(bld);
  477. }
  478. /**
  479. * Calculate the effective URI to use, given a builder configuration.
  480. *
  481. * If there is not already a URI scheme, we prepend 'hdfs://'.
  482. *
  483. * If there is not already a port specified, and a port was given to the
  484. * builder, we suffix that port. If there is a port specified but also one in
  485. * the URI, that is an error.
  486. *
  487. * @param bld The hdfs builder object
  488. * @param uri (out param) dynamically allocated string representing the
  489. * effective URI
  490. *
  491. * @return 0 on success; error code otherwise
  492. */
  493. static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri)
  494. {
  495. const char *scheme;
  496. char suffix[64];
  497. const char *lastColon;
  498. char *u;
  499. size_t uriLen;
  500. if (!bld->nn)
  501. return EINVAL;
  502. scheme = (strstr(bld->nn, "://")) ? "" : "hdfs://";
  503. if (bld->port == 0) {
  504. suffix[0] = '\0';
  505. } else {
  506. lastColon = strrchr(bld->nn, ':');
  507. if (lastColon && (strspn(lastColon + 1, "0123456789") ==
  508. strlen(lastColon + 1))) {
  509. fprintf(stderr, "port %d was given, but URI '%s' already "
  510. "contains a port!\n", bld->port, bld->nn);
  511. return EINVAL;
  512. }
  513. snprintf(suffix, sizeof(suffix), ":%d", bld->port);
  514. }
  515. uriLen = strlen(scheme) + strlen(bld->nn) + strlen(suffix);
  516. u = malloc((uriLen + 1) * (sizeof(char)));
  517. if (!u) {
  518. fprintf(stderr, "calcEffectiveURI: out of memory");
  519. return ENOMEM;
  520. }
  521. snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn, suffix);
  522. *uri = u;
  523. return 0;
  524. }
  525. static const char *maybeNull(const char *str)
  526. {
  527. return str ? str : "(NULL)";
  528. }
  529. static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
  530. char *buf, size_t bufLen)
  531. {
  532. snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, "
  533. "kerbTicketCachePath=%s, userName=%s",
  534. bld->forceNewInstance, maybeNull(bld->nn), bld->port,
  535. maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
  536. return buf;
  537. }
  538. hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
  539. {
  540. JNIEnv *env = 0;
  541. jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
  542. jstring jURIString = NULL, jUserString = NULL;
  543. jvalue jVal;
  544. jthrowable jthr = NULL;
  545. char *cURI = 0, buf[512];
  546. int ret;
  547. jobject jRet = NULL;
  548. struct hdfsBuilderConfOpt *opt;
  549. //Get the JNIEnv* corresponding to current thread
  550. env = getJNIEnv();
  551. if (env == NULL) {
  552. ret = EINTERNAL;
  553. goto done;
  554. }
  555. // jConfiguration = new Configuration();
  556. jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
  557. if (jthr) {
  558. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  559. "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
  560. goto done;
  561. }
  562. // set configuration values
  563. for (opt = bld->opts; opt; opt = opt->next) {
  564. jthr = hadoopConfSetStr(env, jConfiguration, opt->key, opt->val);
  565. if (jthr) {
  566. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  567. "hdfsBuilderConnect(%s): error setting conf '%s' to '%s'",
  568. hdfsBuilderToStr(bld, buf, sizeof(buf)), opt->key, opt->val);
  569. goto done;
  570. }
  571. }
  572. //Check what type of FileSystem the caller wants...
  573. if (bld->nn == NULL) {
  574. // Get a local filesystem.
  575. if (bld->forceNewInstance) {
  576. // fs = FileSytem#newInstanceLocal(conf);
  577. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
  578. "newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF),
  579. JPARAM(HADOOP_LOCALFS)), jConfiguration);
  580. if (jthr) {
  581. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  582. "hdfsBuilderConnect(%s)",
  583. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  584. goto done;
  585. }
  586. jFS = jVal.l;
  587. } else {
  588. // fs = FileSytem#getLocal(conf);
  589. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal",
  590. JMETHOD1(JPARAM(HADOOP_CONF),
  591. JPARAM(HADOOP_LOCALFS)),
  592. jConfiguration);
  593. if (jthr) {
  594. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  595. "hdfsBuilderConnect(%s)",
  596. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  597. goto done;
  598. }
  599. jFS = jVal.l;
  600. }
  601. } else {
  602. if (!strcmp(bld->nn, "default")) {
  603. // jURI = FileSystem.getDefaultUri(conf)
  604. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
  605. "getDefaultUri",
  606. "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
  607. jConfiguration);
  608. if (jthr) {
  609. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  610. "hdfsBuilderConnect(%s)",
  611. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  612. goto done;
  613. }
  614. jURI = jVal.l;
  615. } else {
  616. // fs = FileSystem#get(URI, conf, ugi);
  617. ret = calcEffectiveURI(bld, &cURI);
  618. if (ret)
  619. goto done;
  620. jthr = newJavaStr(env, cURI, &jURIString);
  621. if (jthr) {
  622. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  623. "hdfsBuilderConnect(%s)",
  624. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  625. goto done;
  626. }
  627. jthr = invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI,
  628. "create", "(Ljava/lang/String;)Ljava/net/URI;",
  629. jURIString);
  630. if (jthr) {
  631. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  632. "hdfsBuilderConnect(%s)",
  633. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  634. goto done;
  635. }
  636. jURI = jVal.l;
  637. }
  638. if (bld->kerbTicketCachePath) {
  639. jthr = hadoopConfSetStr(env, jConfiguration,
  640. KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
  641. if (jthr) {
  642. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  643. "hdfsBuilderConnect(%s)",
  644. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  645. goto done;
  646. }
  647. }
  648. jthr = newJavaStr(env, bld->userName, &jUserString);
  649. if (jthr) {
  650. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  651. "hdfsBuilderConnect(%s)",
  652. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  653. goto done;
  654. }
  655. if (bld->forceNewInstance) {
  656. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
  657. "newInstance", JMETHOD3(JPARAM(JAVA_NET_URI),
  658. JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
  659. JPARAM(HADOOP_FS)),
  660. jURI, jConfiguration, jUserString);
  661. if (jthr) {
  662. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  663. "hdfsBuilderConnect(%s)",
  664. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  665. goto done;
  666. }
  667. jFS = jVal.l;
  668. } else {
  669. jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get",
  670. JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
  671. JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)),
  672. jURI, jConfiguration, jUserString);
  673. if (jthr) {
  674. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  675. "hdfsBuilderConnect(%s)",
  676. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  677. goto done;
  678. }
  679. jFS = jVal.l;
  680. }
  681. }
  682. jRet = (*env)->NewGlobalRef(env, jFS);
  683. if (!jRet) {
  684. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  685. "hdfsBuilderConnect(%s)",
  686. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  687. goto done;
  688. }
  689. ret = 0;
  690. done:
  691. // Release unnecessary local references
  692. destroyLocalReference(env, jConfiguration);
  693. destroyLocalReference(env, jFS);
  694. destroyLocalReference(env, jURI);
  695. destroyLocalReference(env, jCachePath);
  696. destroyLocalReference(env, jURIString);
  697. destroyLocalReference(env, jUserString);
  698. free(cURI);
  699. hdfsFreeBuilder(bld);
  700. if (ret) {
  701. errno = ret;
  702. return NULL;
  703. }
  704. return (hdfsFS)jRet;
  705. }
  706. int hdfsDisconnect(hdfsFS fs)
  707. {
  708. // JAVA EQUIVALENT:
  709. // fs.close()
  710. //Get the JNIEnv* corresponding to current thread
  711. JNIEnv* env = getJNIEnv();
  712. int ret;
  713. jobject jFS;
  714. jthrowable jthr;
  715. if (env == NULL) {
  716. errno = EINTERNAL;
  717. return -1;
  718. }
  719. //Parameters
  720. jFS = (jobject)fs;
  721. //Sanity check
  722. if (fs == NULL) {
  723. errno = EBADF;
  724. return -1;
  725. }
  726. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  727. "close", "()V");
  728. if (jthr) {
  729. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  730. "hdfsDisconnect: FileSystem#close");
  731. } else {
  732. ret = 0;
  733. }
  734. (*env)->DeleteGlobalRef(env, jFS);
  735. if (ret) {
  736. errno = ret;
  737. return -1;
  738. }
  739. return 0;
  740. }
  741. /**
  742. * Get the default block size of a FileSystem object.
  743. *
  744. * @param env The Java env
  745. * @param jFS The FileSystem object
  746. * @param jPath The path to find the default blocksize at
  747. * @param out (out param) the default block size
  748. *
  749. * @return NULL on success; or the exception
  750. */
  751. static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS,
  752. jobject jPath, jlong *out)
  753. {
  754. jthrowable jthr;
  755. jvalue jVal;
  756. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  757. "getDefaultBlockSize", JMETHOD1(JPARAM(HADOOP_PATH), "J"), jPath);
  758. if (jthr)
  759. return jthr;
  760. *out = jVal.j;
  761. return NULL;
  762. }
  763. hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags,
  764. int bufferSize, short replication, tSize blockSize)
  765. {
  766. /*
  767. JAVA EQUIVALENT:
  768. File f = new File(path);
  769. FSData{Input|Output}Stream f{is|os} = fs.create(f);
  770. return f{is|os};
  771. */
  772. int accmode = flags & O_ACCMODE;
  773. jstring jStrBufferSize = NULL, jStrReplication = NULL;
  774. jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
  775. jobject jFS = (jobject)fs;
  776. jthrowable jthr;
  777. jvalue jVal;
  778. hdfsFile file = NULL;
  779. int ret;
  780. jint jBufferSize = bufferSize;
  781. jshort jReplication = replication;
  782. /* The hadoop java api/signature */
  783. const char *method = NULL;
  784. const char *signature = NULL;
  785. /* Get the JNIEnv* corresponding to current thread */
  786. JNIEnv* env = getJNIEnv();
  787. if (env == NULL) {
  788. errno = EINTERNAL;
  789. return NULL;
  790. }
  791. if (accmode == O_RDONLY || accmode == O_WRONLY) {
  792. /* yay */
  793. } else if (accmode == O_RDWR) {
  794. fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
  795. errno = ENOTSUP;
  796. return NULL;
  797. } else {
  798. fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode);
  799. errno = EINVAL;
  800. return NULL;
  801. }
  802. if ((flags & O_CREAT) && (flags & O_EXCL)) {
  803. fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
  804. }
  805. if (accmode == O_RDONLY) {
  806. method = "open";
  807. signature = JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_ISTRM));
  808. } else if (flags & O_APPEND) {
  809. method = "append";
  810. signature = JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_OSTRM));
  811. } else {
  812. method = "create";
  813. signature = JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_OSTRM));
  814. }
  815. /* Create an object of org.apache.hadoop.fs.Path */
  816. jthr = constructNewObjectOfPath(env, path, &jPath);
  817. if (jthr) {
  818. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  819. "hdfsOpenFile(%s): constructNewObjectOfPath", path);
  820. goto done;
  821. }
  822. /* Get the Configuration object from the FileSystem object */
  823. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  824. "getConf", JMETHOD1("", JPARAM(HADOOP_CONF)));
  825. if (jthr) {
  826. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  827. "hdfsOpenFile(%s): FileSystem#getConf", path);
  828. goto done;
  829. }
  830. jConfiguration = jVal.l;
  831. jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size");
  832. if (!jStrBufferSize) {
  833. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
  834. goto done;
  835. }
  836. jStrReplication = (*env)->NewStringUTF(env, "dfs.replication");
  837. if (!jStrReplication) {
  838. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
  839. goto done;
  840. }
  841. if (!bufferSize) {
  842. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  843. HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I",
  844. jStrBufferSize, 4096);
  845. if (jthr) {
  846. ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND |
  847. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_UNRESOLVED_LINK,
  848. "hdfsOpenFile(%s): Configuration#getInt(io.file.buffer.size)",
  849. path);
  850. goto done;
  851. }
  852. jBufferSize = jVal.i;
  853. }
  854. if ((accmode == O_WRONLY) && (flags & O_APPEND) == 0) {
  855. if (!replication) {
  856. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  857. HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I",
  858. jStrReplication, 1);
  859. if (jthr) {
  860. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  861. "hdfsOpenFile(%s): Configuration#getInt(dfs.replication)",
  862. path);
  863. goto done;
  864. }
  865. jReplication = (jshort)jVal.i;
  866. }
  867. }
  868. /* Create and return either the FSDataInputStream or
  869. FSDataOutputStream references jobject jStream */
  870. // READ?
  871. if (accmode == O_RDONLY) {
  872. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  873. method, signature, jPath, jBufferSize);
  874. } else if ((accmode == O_WRONLY) && (flags & O_APPEND)) {
  875. // WRITE/APPEND?
  876. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  877. method, signature, jPath);
  878. } else {
  879. // WRITE/CREATE
  880. jboolean jOverWrite = 1;
  881. jlong jBlockSize = blockSize;
  882. if (jBlockSize == 0) {
  883. jthr = getDefaultBlockSize(env, jFS, jPath, &jBlockSize);
  884. if (jthr) {
  885. ret = EIO;
  886. goto done;
  887. }
  888. }
  889. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  890. method, signature, jPath, jOverWrite,
  891. jBufferSize, jReplication, jBlockSize);
  892. }
  893. if (jthr) {
  894. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  895. "hdfsOpenFile(%s): FileSystem#%s(%s)", path, method, signature);
  896. goto done;
  897. }
  898. jFile = jVal.l;
  899. file = calloc(1, sizeof(struct hdfsFile_internal));
  900. if (!file) {
  901. fprintf(stderr, "hdfsOpenFile(%s): OOM create hdfsFile\n", path);
  902. ret = ENOMEM;
  903. goto done;
  904. }
  905. file->file = (*env)->NewGlobalRef(env, jFile);
  906. if (!file->file) {
  907. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  908. "hdfsOpenFile(%s): NewGlobalRef", path);
  909. goto done;
  910. }
  911. file->type = (((flags & O_WRONLY) == 0) ? HDFS_STREAM_INPUT :
  912. HDFS_STREAM_OUTPUT);
  913. file->flags = 0;
  914. if ((flags & O_WRONLY) == 0) {
  915. // Try a test read to see if we can do direct reads
  916. char buf;
  917. if (readDirect(fs, file, &buf, 0) == 0) {
  918. // Success - 0-byte read should return 0
  919. file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
  920. } else if (errno != ENOTSUP) {
  921. // Unexpected error. Clear it, don't set the direct flag.
  922. fprintf(stderr,
  923. "hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
  924. "for direct read compatibility\n", path, errno);
  925. }
  926. }
  927. ret = 0;
  928. done:
  929. destroyLocalReference(env, jStrBufferSize);
  930. destroyLocalReference(env, jStrReplication);
  931. destroyLocalReference(env, jConfiguration);
  932. destroyLocalReference(env, jPath);
  933. destroyLocalReference(env, jFile);
  934. if (ret) {
  935. if (file) {
  936. if (file->file) {
  937. (*env)->DeleteGlobalRef(env, file->file);
  938. }
  939. free(file);
  940. }
  941. errno = ret;
  942. return NULL;
  943. }
  944. return file;
  945. }
  946. int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength)
  947. {
  948. jobject jFS = (jobject)fs;
  949. jthrowable jthr;
  950. jvalue jVal;
  951. jobject jPath = NULL;
  952. JNIEnv *env = getJNIEnv();
  953. if (!env) {
  954. errno = EINTERNAL;
  955. return -1;
  956. }
  957. /* Create an object of org.apache.hadoop.fs.Path */
  958. jthr = constructNewObjectOfPath(env, path, &jPath);
  959. if (jthr) {
  960. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  961. "hdfsTruncateFile(%s): constructNewObjectOfPath", path);
  962. return -1;
  963. }
  964. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  965. "truncate", JMETHOD2(JPARAM(HADOOP_PATH), "J", "Z"),
  966. jPath, newlength);
  967. destroyLocalReference(env, jPath);
  968. if (jthr) {
  969. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  970. "hdfsTruncateFile(%s): FileSystem#truncate", path);
  971. return -1;
  972. }
  973. if (jVal.z == JNI_TRUE) {
  974. return 1;
  975. }
  976. return 0;
  977. }
  978. int hdfsUnbufferFile(hdfsFile file)
  979. {
  980. int ret;
  981. jthrowable jthr;
  982. JNIEnv *env = getJNIEnv();
  983. if (!env) {
  984. ret = EINTERNAL;
  985. goto done;
  986. }
  987. if (file->type != HDFS_STREAM_INPUT) {
  988. ret = ENOTSUP;
  989. goto done;
  990. }
  991. jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM,
  992. "unbuffer", "()V");
  993. if (jthr) {
  994. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  995. HADOOP_ISTRM "#unbuffer failed:");
  996. goto done;
  997. }
  998. ret = 0;
  999. done:
  1000. errno = ret;
  1001. return ret;
  1002. }
  1003. int hdfsCloseFile(hdfsFS fs, hdfsFile file)
  1004. {
  1005. int ret;
  1006. // JAVA EQUIVALENT:
  1007. // file.close
  1008. //The interface whose 'close' method to be called
  1009. const char *interface;
  1010. const char *interfaceShortName;
  1011. //Caught exception
  1012. jthrowable jthr;
  1013. //Get the JNIEnv* corresponding to current thread
  1014. JNIEnv* env = getJNIEnv();
  1015. if (env == NULL) {
  1016. errno = EINTERNAL;
  1017. return -1;
  1018. }
  1019. //Sanity check
  1020. if (!file || file->type == HDFS_STREAM_UNINITIALIZED) {
  1021. errno = EBADF;
  1022. return -1;
  1023. }
  1024. interface = (file->type == HDFS_STREAM_INPUT) ?
  1025. HADOOP_ISTRM : HADOOP_OSTRM;
  1026. jthr = invokeMethod(env, NULL, INSTANCE, file->file, interface,
  1027. "close", "()V");
  1028. if (jthr) {
  1029. interfaceShortName = (file->type == HDFS_STREAM_INPUT) ?
  1030. "FSDataInputStream" : "FSDataOutputStream";
  1031. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1032. "%s#close", interfaceShortName);
  1033. } else {
  1034. ret = 0;
  1035. }
  1036. //De-allocate memory
  1037. (*env)->DeleteGlobalRef(env, file->file);
  1038. free(file);
  1039. if (ret) {
  1040. errno = ret;
  1041. return -1;
  1042. }
  1043. return 0;
  1044. }
  1045. int hdfsExists(hdfsFS fs, const char *path)
  1046. {
  1047. JNIEnv *env = getJNIEnv();
  1048. jobject jPath;
  1049. jvalue jVal;
  1050. jobject jFS = (jobject)fs;
  1051. jthrowable jthr;
  1052. if (env == NULL) {
  1053. errno = EINTERNAL;
  1054. return -1;
  1055. }
  1056. if (path == NULL) {
  1057. errno = EINVAL;
  1058. return -1;
  1059. }
  1060. jthr = constructNewObjectOfPath(env, path, &jPath);
  1061. if (jthr) {
  1062. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1063. "hdfsExists: constructNewObjectOfPath");
  1064. return -1;
  1065. }
  1066. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  1067. "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath);
  1068. destroyLocalReference(env, jPath);
  1069. if (jthr) {
  1070. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1071. "hdfsExists: invokeMethod(%s)",
  1072. JMETHOD1(JPARAM(HADOOP_PATH), "Z"));
  1073. return -1;
  1074. }
  1075. if (jVal.z) {
  1076. return 0;
  1077. } else {
  1078. errno = ENOENT;
  1079. return -1;
  1080. }
  1081. }
  1082. // Checks input file for readiness for reading.
  1083. static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
  1084. jobject* jInputStream)
  1085. {
  1086. *jInputStream = (jobject)(f ? f->file : NULL);
  1087. //Sanity check
  1088. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  1089. errno = EBADF;
  1090. return -1;
  1091. }
  1092. //Error checking... make sure that this file is 'readable'
  1093. if (f->type != HDFS_STREAM_INPUT) {
  1094. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  1095. errno = EINVAL;
  1096. return -1;
  1097. }
  1098. return 0;
  1099. }
  1100. tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  1101. {
  1102. jobject jInputStream;
  1103. jbyteArray jbRarray;
  1104. jint noReadBytes = length;
  1105. jvalue jVal;
  1106. jthrowable jthr;
  1107. JNIEnv* env;
  1108. if (length == 0) {
  1109. return 0;
  1110. } else if (length < 0) {
  1111. errno = EINVAL;
  1112. return -1;
  1113. }
  1114. if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
  1115. return readDirect(fs, f, buffer, length);
  1116. }
  1117. // JAVA EQUIVALENT:
  1118. // byte [] bR = new byte[length];
  1119. // fis.read(bR);
  1120. //Get the JNIEnv* corresponding to current thread
  1121. env = getJNIEnv();
  1122. if (env == NULL) {
  1123. errno = EINTERNAL;
  1124. return -1;
  1125. }
  1126. //Parameters
  1127. if (readPrepare(env, fs, f, &jInputStream) == -1) {
  1128. return -1;
  1129. }
  1130. //Read the requisite bytes
  1131. jbRarray = (*env)->NewByteArray(env, length);
  1132. if (!jbRarray) {
  1133. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1134. "hdfsRead: NewByteArray");
  1135. return -1;
  1136. }
  1137. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, HADOOP_ISTRM,
  1138. "read", "([B)I", jbRarray);
  1139. if (jthr) {
  1140. destroyLocalReference(env, jbRarray);
  1141. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1142. "hdfsRead: FSDataInputStream#read");
  1143. return -1;
  1144. }
  1145. if (jVal.i < 0) {
  1146. // EOF
  1147. destroyLocalReference(env, jbRarray);
  1148. return 0;
  1149. } else if (jVal.i == 0) {
  1150. destroyLocalReference(env, jbRarray);
  1151. errno = EINTR;
  1152. return -1;
  1153. }
  1154. (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
  1155. destroyLocalReference(env, jbRarray);
  1156. if ((*env)->ExceptionCheck(env)) {
  1157. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1158. "hdfsRead: GetByteArrayRegion");
  1159. return -1;
  1160. }
  1161. return jVal.i;
  1162. }
  1163. // Reads using the read(ByteBuffer) API, which does fewer copies
  1164. tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  1165. {
  1166. // JAVA EQUIVALENT:
  1167. // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
  1168. // fis.read(bbuffer);
  1169. jobject jInputStream;
  1170. jvalue jVal;
  1171. jthrowable jthr;
  1172. jobject bb;
  1173. //Get the JNIEnv* corresponding to current thread
  1174. JNIEnv* env = getJNIEnv();
  1175. if (env == NULL) {
  1176. errno = EINTERNAL;
  1177. return -1;
  1178. }
  1179. if (readPrepare(env, fs, f, &jInputStream) == -1) {
  1180. return -1;
  1181. }
  1182. //Read the requisite bytes
  1183. bb = (*env)->NewDirectByteBuffer(env, buffer, length);
  1184. if (bb == NULL) {
  1185. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1186. "readDirect: NewDirectByteBuffer");
  1187. return -1;
  1188. }
  1189. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
  1190. HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb);
  1191. destroyLocalReference(env, bb);
  1192. if (jthr) {
  1193. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1194. "readDirect: FSDataInputStream#read");
  1195. return -1;
  1196. }
  1197. return (jVal.i < 0) ? 0 : jVal.i;
  1198. }
  1199. tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
  1200. void* buffer, tSize length)
  1201. {
  1202. JNIEnv* env;
  1203. jbyteArray jbRarray;
  1204. jvalue jVal;
  1205. jthrowable jthr;
  1206. if (length == 0) {
  1207. return 0;
  1208. } else if (length < 0) {
  1209. errno = EINVAL;
  1210. return -1;
  1211. }
  1212. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  1213. errno = EBADF;
  1214. return -1;
  1215. }
  1216. env = getJNIEnv();
  1217. if (env == NULL) {
  1218. errno = EINTERNAL;
  1219. return -1;
  1220. }
  1221. //Error checking... make sure that this file is 'readable'
  1222. if (f->type != HDFS_STREAM_INPUT) {
  1223. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  1224. errno = EINVAL;
  1225. return -1;
  1226. }
  1227. // JAVA EQUIVALENT:
  1228. // byte [] bR = new byte[length];
  1229. // fis.read(pos, bR, 0, length);
  1230. jbRarray = (*env)->NewByteArray(env, length);
  1231. if (!jbRarray) {
  1232. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1233. "hdfsPread: NewByteArray");
  1234. return -1;
  1235. }
  1236. jthr = invokeMethod(env, &jVal, INSTANCE, f->file, HADOOP_ISTRM,
  1237. "read", "(J[BII)I", position, jbRarray, 0, length);
  1238. if (jthr) {
  1239. destroyLocalReference(env, jbRarray);
  1240. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1241. "hdfsPread: FSDataInputStream#read");
  1242. return -1;
  1243. }
  1244. if (jVal.i < 0) {
  1245. // EOF
  1246. destroyLocalReference(env, jbRarray);
  1247. return 0;
  1248. } else if (jVal.i == 0) {
  1249. destroyLocalReference(env, jbRarray);
  1250. errno = EINTR;
  1251. return -1;
  1252. }
  1253. (*env)->GetByteArrayRegion(env, jbRarray, 0, jVal.i, buffer);
  1254. destroyLocalReference(env, jbRarray);
  1255. if ((*env)->ExceptionCheck(env)) {
  1256. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1257. "hdfsPread: GetByteArrayRegion");
  1258. return -1;
  1259. }
  1260. return jVal.i;
  1261. }
  1262. tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
  1263. {
  1264. // JAVA EQUIVALENT
  1265. // byte b[] = str.getBytes();
  1266. // fso.write(b);
  1267. jobject jOutputStream;
  1268. jbyteArray jbWarray;
  1269. jthrowable jthr;
  1270. //Get the JNIEnv* corresponding to current thread
  1271. JNIEnv* env = getJNIEnv();
  1272. if (env == NULL) {
  1273. errno = EINTERNAL;
  1274. return -1;
  1275. }
  1276. //Sanity check
  1277. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  1278. errno = EBADF;
  1279. return -1;
  1280. }
  1281. jOutputStream = f->file;
  1282. if (length < 0) {
  1283. errno = EINVAL;
  1284. return -1;
  1285. }
  1286. //Error checking... make sure that this file is 'writable'
  1287. if (f->type != HDFS_STREAM_OUTPUT) {
  1288. fprintf(stderr, "Cannot write into a non-OutputStream object!\n");
  1289. errno = EINVAL;
  1290. return -1;
  1291. }
  1292. if (length < 0) {
  1293. errno = EINVAL;
  1294. return -1;
  1295. }
  1296. if (length == 0) {
  1297. return 0;
  1298. }
  1299. //Write the requisite bytes into the file
  1300. jbWarray = (*env)->NewByteArray(env, length);
  1301. if (!jbWarray) {
  1302. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1303. "hdfsWrite: NewByteArray");
  1304. return -1;
  1305. }
  1306. (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer);
  1307. if ((*env)->ExceptionCheck(env)) {
  1308. destroyLocalReference(env, jbWarray);
  1309. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1310. "hdfsWrite(length = %d): SetByteArrayRegion", length);
  1311. return -1;
  1312. }
  1313. jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  1314. HADOOP_OSTRM, "write", "([B)V", jbWarray);
  1315. destroyLocalReference(env, jbWarray);
  1316. if (jthr) {
  1317. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1318. "hdfsWrite: FSDataOutputStream#write");
  1319. return -1;
  1320. }
  1321. // Unlike most Java streams, FSDataOutputStream never does partial writes.
  1322. // If we succeeded, all the data was written.
  1323. return length;
  1324. }
  1325. int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos)
  1326. {
  1327. // JAVA EQUIVALENT
  1328. // fis.seek(pos);
  1329. jobject jInputStream;
  1330. jthrowable jthr;
  1331. //Get the JNIEnv* corresponding to current thread
  1332. JNIEnv* env = getJNIEnv();
  1333. if (env == NULL) {
  1334. errno = EINTERNAL;
  1335. return -1;
  1336. }
  1337. //Sanity check
  1338. if (!f || f->type != HDFS_STREAM_INPUT) {
  1339. errno = EBADF;
  1340. return -1;
  1341. }
  1342. jInputStream = f->file;
  1343. jthr = invokeMethod(env, NULL, INSTANCE, jInputStream,
  1344. HADOOP_ISTRM, "seek", "(J)V", desiredPos);
  1345. if (jthr) {
  1346. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1347. "hdfsSeek(desiredPos=%" PRId64 ")"
  1348. ": FSDataInputStream#seek", desiredPos);
  1349. return -1;
  1350. }
  1351. return 0;
  1352. }
  1353. tOffset hdfsTell(hdfsFS fs, hdfsFile f)
  1354. {
  1355. // JAVA EQUIVALENT
  1356. // pos = f.getPos();
  1357. jobject jStream;
  1358. const char *interface;
  1359. jvalue jVal;
  1360. jthrowable jthr;
  1361. //Get the JNIEnv* corresponding to current thread
  1362. JNIEnv* env = getJNIEnv();
  1363. if (env == NULL) {
  1364. errno = EINTERNAL;
  1365. return -1;
  1366. }
  1367. //Sanity check
  1368. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  1369. errno = EBADF;
  1370. return -1;
  1371. }
  1372. //Parameters
  1373. jStream = f->file;
  1374. interface = (f->type == HDFS_STREAM_INPUT) ?
  1375. HADOOP_ISTRM : HADOOP_OSTRM;
  1376. jthr = invokeMethod(env, &jVal, INSTANCE, jStream,
  1377. interface, "getPos", "()J");
  1378. if (jthr) {
  1379. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1380. "hdfsTell: %s#getPos",
  1381. ((f->type == HDFS_STREAM_INPUT) ? "FSDataInputStream" :
  1382. "FSDataOutputStream"));
  1383. return -1;
  1384. }
  1385. return jVal.j;
  1386. }
  1387. int hdfsFlush(hdfsFS fs, hdfsFile f)
  1388. {
  1389. // JAVA EQUIVALENT
  1390. // fos.flush();
  1391. jthrowable jthr;
  1392. //Get the JNIEnv* corresponding to current thread
  1393. JNIEnv* env = getJNIEnv();
  1394. if (env == NULL) {
  1395. errno = EINTERNAL;
  1396. return -1;
  1397. }
  1398. //Sanity check
  1399. if (!f || f->type != HDFS_STREAM_OUTPUT) {
  1400. errno = EBADF;
  1401. return -1;
  1402. }
  1403. jthr = invokeMethod(env, NULL, INSTANCE, f->file,
  1404. HADOOP_OSTRM, "flush", "()V");
  1405. if (jthr) {
  1406. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1407. "hdfsFlush: FSDataInputStream#flush");
  1408. return -1;
  1409. }
  1410. return 0;
  1411. }
  1412. int hdfsHFlush(hdfsFS fs, hdfsFile f)
  1413. {
  1414. jobject jOutputStream;
  1415. jthrowable jthr;
  1416. //Get the JNIEnv* corresponding to current thread
  1417. JNIEnv* env = getJNIEnv();
  1418. if (env == NULL) {
  1419. errno = EINTERNAL;
  1420. return -1;
  1421. }
  1422. //Sanity check
  1423. if (!f || f->type != HDFS_STREAM_OUTPUT) {
  1424. errno = EBADF;
  1425. return -1;
  1426. }
  1427. jOutputStream = f->file;
  1428. jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  1429. HADOOP_OSTRM, "hflush", "()V");
  1430. if (jthr) {
  1431. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1432. "hdfsHFlush: FSDataOutputStream#hflush");
  1433. return -1;
  1434. }
  1435. return 0;
  1436. }
  1437. int hdfsHSync(hdfsFS fs, hdfsFile f)
  1438. {
  1439. jobject jOutputStream;
  1440. jthrowable jthr;
  1441. //Get the JNIEnv* corresponding to current thread
  1442. JNIEnv* env = getJNIEnv();
  1443. if (env == NULL) {
  1444. errno = EINTERNAL;
  1445. return -1;
  1446. }
  1447. //Sanity check
  1448. if (!f || f->type != HDFS_STREAM_OUTPUT) {
  1449. errno = EBADF;
  1450. return -1;
  1451. }
  1452. jOutputStream = f->file;
  1453. jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  1454. HADOOP_OSTRM, "hsync", "()V");
  1455. if (jthr) {
  1456. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1457. "hdfsHSync: FSDataOutputStream#hsync");
  1458. return -1;
  1459. }
  1460. return 0;
  1461. }
  1462. int hdfsAvailable(hdfsFS fs, hdfsFile f)
  1463. {
  1464. // JAVA EQUIVALENT
  1465. // fis.available();
  1466. jobject jInputStream;
  1467. jvalue jVal;
  1468. jthrowable jthr;
  1469. //Get the JNIEnv* corresponding to current thread
  1470. JNIEnv* env = getJNIEnv();
  1471. if (env == NULL) {
  1472. errno = EINTERNAL;
  1473. return -1;
  1474. }
  1475. //Sanity check
  1476. if (!f || f->type != HDFS_STREAM_INPUT) {
  1477. errno = EBADF;
  1478. return -1;
  1479. }
  1480. //Parameters
  1481. jInputStream = f->file;
  1482. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
  1483. HADOOP_ISTRM, "available", "()I");
  1484. if (jthr) {
  1485. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1486. "hdfsAvailable: FSDataInputStream#available");
  1487. return -1;
  1488. }
  1489. return jVal.i;
  1490. }
  1491. static int hdfsCopyImpl(hdfsFS srcFS, const char *src, hdfsFS dstFS,
  1492. const char *dst, jboolean deleteSource)
  1493. {
  1494. //JAVA EQUIVALENT
  1495. // FileUtil#copy(srcFS, srcPath, dstFS, dstPath,
  1496. // deleteSource = false, conf)
  1497. //Parameters
  1498. jobject jSrcFS = (jobject)srcFS;
  1499. jobject jDstFS = (jobject)dstFS;
  1500. jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL;
  1501. jthrowable jthr;
  1502. jvalue jVal;
  1503. int ret;
  1504. //Get the JNIEnv* corresponding to current thread
  1505. JNIEnv* env = getJNIEnv();
  1506. if (env == NULL) {
  1507. errno = EINTERNAL;
  1508. return -1;
  1509. }
  1510. jthr = constructNewObjectOfPath(env, src, &jSrcPath);
  1511. if (jthr) {
  1512. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1513. "hdfsCopyImpl(src=%s): constructNewObjectOfPath", src);
  1514. goto done;
  1515. }
  1516. jthr = constructNewObjectOfPath(env, dst, &jDstPath);
  1517. if (jthr) {
  1518. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1519. "hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst);
  1520. goto done;
  1521. }
  1522. //Create the org.apache.hadoop.conf.Configuration object
  1523. jthr = constructNewObjectOfClass(env, &jConfiguration,
  1524. HADOOP_CONF, "()V");
  1525. if (jthr) {
  1526. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1527. "hdfsCopyImpl: Configuration constructor");
  1528. goto done;
  1529. }
  1530. //FileUtil#copy
  1531. jthr = invokeMethod(env, &jVal, STATIC,
  1532. NULL, "org/apache/hadoop/fs/FileUtil", "copy",
  1533. "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
  1534. "Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
  1535. "ZLorg/apache/hadoop/conf/Configuration;)Z",
  1536. jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource,
  1537. jConfiguration);
  1538. if (jthr) {
  1539. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1540. "hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): "
  1541. "FileUtil#copy", src, dst, deleteSource);
  1542. goto done;
  1543. }
  1544. if (!jVal.z) {
  1545. ret = EIO;
  1546. goto done;
  1547. }
  1548. ret = 0;
  1549. done:
  1550. destroyLocalReference(env, jConfiguration);
  1551. destroyLocalReference(env, jSrcPath);
  1552. destroyLocalReference(env, jDstPath);
  1553. if (ret) {
  1554. errno = ret;
  1555. return -1;
  1556. }
  1557. return 0;
  1558. }
  1559. int hdfsCopy(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst)
  1560. {
  1561. return hdfsCopyImpl(srcFS, src, dstFS, dst, 0);
  1562. }
  1563. int hdfsMove(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst)
  1564. {
  1565. return hdfsCopyImpl(srcFS, src, dstFS, dst, 1);
  1566. }
  1567. int hdfsDelete(hdfsFS fs, const char *path, int recursive)
  1568. {
  1569. // JAVA EQUIVALENT:
  1570. // Path p = new Path(path);
  1571. // bool retval = fs.delete(p, recursive);
  1572. jobject jFS = (jobject)fs;
  1573. jthrowable jthr;
  1574. jobject jPath;
  1575. jvalue jVal;
  1576. jboolean jRecursive;
  1577. //Get the JNIEnv* corresponding to current thread
  1578. JNIEnv* env = getJNIEnv();
  1579. if (env == NULL) {
  1580. errno = EINTERNAL;
  1581. return -1;
  1582. }
  1583. jthr = constructNewObjectOfPath(env, path, &jPath);
  1584. if (jthr) {
  1585. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1586. "hdfsDelete(path=%s): constructNewObjectOfPath", path);
  1587. return -1;
  1588. }
  1589. jRecursive = recursive ? JNI_TRUE : JNI_FALSE;
  1590. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  1591. "delete", "(Lorg/apache/hadoop/fs/Path;Z)Z",
  1592. jPath, jRecursive);
  1593. destroyLocalReference(env, jPath);
  1594. if (jthr) {
  1595. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1596. "hdfsDelete(path=%s, recursive=%d): "
  1597. "FileSystem#delete", path, recursive);
  1598. return -1;
  1599. }
  1600. if (!jVal.z) {
  1601. errno = EIO;
  1602. return -1;
  1603. }
  1604. return 0;
  1605. }
  1606. int hdfsRename(hdfsFS fs, const char *oldPath, const char *newPath)
  1607. {
  1608. // JAVA EQUIVALENT:
  1609. // Path old = new Path(oldPath);
  1610. // Path new = new Path(newPath);
  1611. // fs.rename(old, new);
  1612. jobject jFS = (jobject)fs;
  1613. jthrowable jthr;
  1614. jobject jOldPath = NULL, jNewPath = NULL;
  1615. int ret = -1;
  1616. jvalue jVal;
  1617. //Get the JNIEnv* corresponding to current thread
  1618. JNIEnv* env = getJNIEnv();
  1619. if (env == NULL) {
  1620. errno = EINTERNAL;
  1621. return -1;
  1622. }
  1623. jthr = constructNewObjectOfPath(env, oldPath, &jOldPath );
  1624. if (jthr) {
  1625. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1626. "hdfsRename: constructNewObjectOfPath(%s)", oldPath);
  1627. goto done;
  1628. }
  1629. jthr = constructNewObjectOfPath(env, newPath, &jNewPath);
  1630. if (jthr) {
  1631. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1632. "hdfsRename: constructNewObjectOfPath(%s)", newPath);
  1633. goto done;
  1634. }
  1635. // Rename the file
  1636. // TODO: use rename2 here? (See HDFS-3592)
  1637. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "rename",
  1638. JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_PATH), "Z"),
  1639. jOldPath, jNewPath);
  1640. if (jthr) {
  1641. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1642. "hdfsRename(oldPath=%s, newPath=%s): FileSystem#rename",
  1643. oldPath, newPath);
  1644. goto done;
  1645. }
  1646. if (!jVal.z) {
  1647. errno = EIO;
  1648. goto done;
  1649. }
  1650. ret = 0;
  1651. done:
  1652. destroyLocalReference(env, jOldPath);
  1653. destroyLocalReference(env, jNewPath);
  1654. return ret;
  1655. }
  1656. char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize)
  1657. {
  1658. // JAVA EQUIVALENT:
  1659. // Path p = fs.getWorkingDirectory();
  1660. // return p.toString()
  1661. jobject jPath = NULL;
  1662. jstring jPathString = NULL;
  1663. jobject jFS = (jobject)fs;
  1664. jvalue jVal;
  1665. jthrowable jthr;
  1666. int ret;
  1667. const char *jPathChars = NULL;
  1668. //Get the JNIEnv* corresponding to current thread
  1669. JNIEnv* env = getJNIEnv();
  1670. if (env == NULL) {
  1671. errno = EINTERNAL;
  1672. return NULL;
  1673. }
  1674. //FileSystem#getWorkingDirectory()
  1675. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  1676. HADOOP_FS, "getWorkingDirectory",
  1677. "()Lorg/apache/hadoop/fs/Path;");
  1678. if (jthr) {
  1679. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1680. "hdfsGetWorkingDirectory: FileSystem#getWorkingDirectory");
  1681. goto done;
  1682. }
  1683. jPath = jVal.l;
  1684. if (!jPath) {
  1685. fprintf(stderr, "hdfsGetWorkingDirectory: "
  1686. "FileSystem#getWorkingDirectory returned NULL");
  1687. ret = -EIO;
  1688. goto done;
  1689. }
  1690. //Path#toString()
  1691. jthr = invokeMethod(env, &jVal, INSTANCE, jPath,
  1692. "org/apache/hadoop/fs/Path", "toString",
  1693. "()Ljava/lang/String;");
  1694. if (jthr) {
  1695. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1696. "hdfsGetWorkingDirectory: Path#toString");
  1697. goto done;
  1698. }
  1699. jPathString = jVal.l;
  1700. jPathChars = (*env)->GetStringUTFChars(env, jPathString, NULL);
  1701. if (!jPathChars) {
  1702. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1703. "hdfsGetWorkingDirectory: GetStringUTFChars");
  1704. goto done;
  1705. }
  1706. //Copy to user-provided buffer
  1707. ret = snprintf(buffer, bufferSize, "%s", jPathChars);
  1708. if (ret >= bufferSize) {
  1709. ret = ENAMETOOLONG;
  1710. goto done;
  1711. }
  1712. ret = 0;
  1713. done:
  1714. if (jPathChars) {
  1715. (*env)->ReleaseStringUTFChars(env, jPathString, jPathChars);
  1716. }
  1717. destroyLocalReference(env, jPath);
  1718. destroyLocalReference(env, jPathString);
  1719. if (ret) {
  1720. errno = ret;
  1721. return NULL;
  1722. }
  1723. return buffer;
  1724. }
  1725. int hdfsSetWorkingDirectory(hdfsFS fs, const char *path)
  1726. {
  1727. // JAVA EQUIVALENT:
  1728. // fs.setWorkingDirectory(Path(path));
  1729. jobject jFS = (jobject)fs;
  1730. jthrowable jthr;
  1731. jobject jPath;
  1732. //Get the JNIEnv* corresponding to current thread
  1733. JNIEnv* env = getJNIEnv();
  1734. if (env == NULL) {
  1735. errno = EINTERNAL;
  1736. return -1;
  1737. }
  1738. //Create an object of org.apache.hadoop.fs.Path
  1739. jthr = constructNewObjectOfPath(env, path, &jPath);
  1740. if (jthr) {
  1741. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1742. "hdfsSetWorkingDirectory(%s): constructNewObjectOfPath",
  1743. path);
  1744. return -1;
  1745. }
  1746. //FileSystem#setWorkingDirectory()
  1747. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1748. "setWorkingDirectory",
  1749. "(Lorg/apache/hadoop/fs/Path;)V", jPath);
  1750. destroyLocalReference(env, jPath);
  1751. if (jthr) {
  1752. errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ILLEGAL_ARGUMENT,
  1753. "hdfsSetWorkingDirectory(%s): FileSystem#setWorkingDirectory",
  1754. path);
  1755. return -1;
  1756. }
  1757. return 0;
  1758. }
  1759. int hdfsCreateDirectory(hdfsFS fs, const char *path)
  1760. {
  1761. // JAVA EQUIVALENT:
  1762. // fs.mkdirs(new Path(path));
  1763. jobject jFS = (jobject)fs;
  1764. jobject jPath;
  1765. jthrowable jthr;
  1766. jvalue jVal;
  1767. //Get the JNIEnv* corresponding to current thread
  1768. JNIEnv* env = getJNIEnv();
  1769. if (env == NULL) {
  1770. errno = EINTERNAL;
  1771. return -1;
  1772. }
  1773. //Create an object of org.apache.hadoop.fs.Path
  1774. jthr = constructNewObjectOfPath(env, path, &jPath);
  1775. if (jthr) {
  1776. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1777. "hdfsCreateDirectory(%s): constructNewObjectOfPath", path);
  1778. return -1;
  1779. }
  1780. //Create the directory
  1781. jVal.z = 0;
  1782. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  1783. "mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z",
  1784. jPath);
  1785. destroyLocalReference(env, jPath);
  1786. if (jthr) {
  1787. errno = printExceptionAndFree(env, jthr,
  1788. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1789. NOPRINT_EXC_UNRESOLVED_LINK | NOPRINT_EXC_PARENT_NOT_DIRECTORY,
  1790. "hdfsCreateDirectory(%s): FileSystem#mkdirs", path);
  1791. return -1;
  1792. }
  1793. if (!jVal.z) {
  1794. // It's unclear under exactly which conditions FileSystem#mkdirs
  1795. // is supposed to return false (as opposed to throwing an exception.)
  1796. // It seems like the current code never actually returns false.
  1797. // So we're going to translate this to EIO, since there seems to be
  1798. // nothing more specific we can do with it.
  1799. errno = EIO;
  1800. return -1;
  1801. }
  1802. return 0;
  1803. }
  1804. int hdfsSetReplication(hdfsFS fs, const char *path, int16_t replication)
  1805. {
  1806. // JAVA EQUIVALENT:
  1807. // fs.setReplication(new Path(path), replication);
  1808. jobject jFS = (jobject)fs;
  1809. jthrowable jthr;
  1810. jobject jPath;
  1811. jvalue jVal;
  1812. //Get the JNIEnv* corresponding to current thread
  1813. JNIEnv* env = getJNIEnv();
  1814. if (env == NULL) {
  1815. errno = EINTERNAL;
  1816. return -1;
  1817. }
  1818. //Create an object of org.apache.hadoop.fs.Path
  1819. jthr = constructNewObjectOfPath(env, path, &jPath);
  1820. if (jthr) {
  1821. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1822. "hdfsSetReplication(path=%s): constructNewObjectOfPath", path);
  1823. return -1;
  1824. }
  1825. //Create the directory
  1826. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  1827. "setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z",
  1828. jPath, replication);
  1829. destroyLocalReference(env, jPath);
  1830. if (jthr) {
  1831. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1832. "hdfsSetReplication(path=%s, replication=%d): "
  1833. "FileSystem#setReplication", path, replication);
  1834. return -1;
  1835. }
  1836. if (!jVal.z) {
  1837. // setReplication returns false "if file does not exist or is a
  1838. // directory." So the nearest translation to that is ENOENT.
  1839. errno = ENOENT;
  1840. return -1;
  1841. }
  1842. return 0;
  1843. }
  1844. int hdfsChown(hdfsFS fs, const char *path, const char *owner, const char *group)
  1845. {
  1846. // JAVA EQUIVALENT:
  1847. // fs.setOwner(path, owner, group)
  1848. jobject jFS = (jobject)fs;
  1849. jobject jPath = NULL;
  1850. jstring jOwner = NULL, jGroup = NULL;
  1851. jthrowable jthr;
  1852. int ret;
  1853. //Get the JNIEnv* corresponding to current thread
  1854. JNIEnv* env = getJNIEnv();
  1855. if (env == NULL) {
  1856. errno = EINTERNAL;
  1857. return -1;
  1858. }
  1859. if (owner == NULL && group == NULL) {
  1860. return 0;
  1861. }
  1862. jthr = constructNewObjectOfPath(env, path, &jPath);
  1863. if (jthr) {
  1864. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1865. "hdfsChown(path=%s): constructNewObjectOfPath", path);
  1866. goto done;
  1867. }
  1868. jthr = newJavaStr(env, owner, &jOwner);
  1869. if (jthr) {
  1870. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1871. "hdfsChown(path=%s): newJavaStr(%s)", path, owner);
  1872. goto done;
  1873. }
  1874. jthr = newJavaStr(env, group, &jGroup);
  1875. if (jthr) {
  1876. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1877. "hdfsChown(path=%s): newJavaStr(%s)", path, group);
  1878. goto done;
  1879. }
  1880. //Create the directory
  1881. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1882. "setOwner", JMETHOD3(JPARAM(HADOOP_PATH),
  1883. JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID),
  1884. jPath, jOwner, jGroup);
  1885. if (jthr) {
  1886. ret = printExceptionAndFree(env, jthr,
  1887. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1888. NOPRINT_EXC_UNRESOLVED_LINK,
  1889. "hdfsChown(path=%s, owner=%s, group=%s): "
  1890. "FileSystem#setOwner", path, owner, group);
  1891. goto done;
  1892. }
  1893. ret = 0;
  1894. done:
  1895. destroyLocalReference(env, jPath);
  1896. destroyLocalReference(env, jOwner);
  1897. destroyLocalReference(env, jGroup);
  1898. if (ret) {
  1899. errno = ret;
  1900. return -1;
  1901. }
  1902. return 0;
  1903. }
  1904. int hdfsChmod(hdfsFS fs, const char *path, short mode)
  1905. {
  1906. int ret;
  1907. // JAVA EQUIVALENT:
  1908. // fs.setPermission(path, FsPermission)
  1909. jthrowable jthr;
  1910. jobject jPath = NULL, jPermObj = NULL;
  1911. jobject jFS = (jobject)fs;
  1912. jshort jmode = mode;
  1913. //Get the JNIEnv* corresponding to current thread
  1914. JNIEnv* env = getJNIEnv();
  1915. if (env == NULL) {
  1916. errno = EINTERNAL;
  1917. return -1;
  1918. }
  1919. // construct jPerm = FsPermission.createImmutable(short mode);
  1920. jthr = constructNewObjectOfClass(env, &jPermObj,
  1921. HADOOP_FSPERM,"(S)V",jmode);
  1922. if (jthr) {
  1923. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1924. "constructNewObjectOfClass(%s)", HADOOP_FSPERM);
  1925. return -1;
  1926. }
  1927. //Create an object of org.apache.hadoop.fs.Path
  1928. jthr = constructNewObjectOfPath(env, path, &jPath);
  1929. if (jthr) {
  1930. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1931. "hdfsChmod(%s): constructNewObjectOfPath", path);
  1932. goto done;
  1933. }
  1934. //Create the directory
  1935. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1936. "setPermission",
  1937. JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSPERM), JAVA_VOID),
  1938. jPath, jPermObj);
  1939. if (jthr) {
  1940. ret = printExceptionAndFree(env, jthr,
  1941. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1942. NOPRINT_EXC_UNRESOLVED_LINK,
  1943. "hdfsChmod(%s): FileSystem#setPermission", path);
  1944. goto done;
  1945. }
  1946. ret = 0;
  1947. done:
  1948. destroyLocalReference(env, jPath);
  1949. destroyLocalReference(env, jPermObj);
  1950. if (ret) {
  1951. errno = ret;
  1952. return -1;
  1953. }
  1954. return 0;
  1955. }
  1956. int hdfsUtime(hdfsFS fs, const char *path, tTime mtime, tTime atime)
  1957. {
  1958. // JAVA EQUIVALENT:
  1959. // fs.setTimes(src, mtime, atime)
  1960. jthrowable jthr;
  1961. jobject jFS = (jobject)fs;
  1962. jobject jPath;
  1963. static const tTime NO_CHANGE = -1;
  1964. jlong jmtime, jatime;
  1965. //Get the JNIEnv* corresponding to current thread
  1966. JNIEnv* env = getJNIEnv();
  1967. if (env == NULL) {
  1968. errno = EINTERNAL;
  1969. return -1;
  1970. }
  1971. //Create an object of org.apache.hadoop.fs.Path
  1972. jthr = constructNewObjectOfPath(env, path, &jPath);
  1973. if (jthr) {
  1974. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1975. "hdfsUtime(path=%s): constructNewObjectOfPath", path);
  1976. return -1;
  1977. }
  1978. jmtime = (mtime == NO_CHANGE) ? -1 : (mtime * (jlong)1000);
  1979. jatime = (atime == NO_CHANGE) ? -1 : (atime * (jlong)1000);
  1980. jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
  1981. "setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J", JAVA_VOID),
  1982. jPath, jmtime, jatime);
  1983. destroyLocalReference(env, jPath);
  1984. if (jthr) {
  1985. errno = printExceptionAndFree(env, jthr,
  1986. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  1987. NOPRINT_EXC_UNRESOLVED_LINK,
  1988. "hdfsUtime(path=%s): FileSystem#setTimes", path);
  1989. return -1;
  1990. }
  1991. return 0;
  1992. }
  1993. /**
  1994. * Zero-copy options.
  1995. *
  1996. * We cache the EnumSet of ReadOptions which has to be passed into every
  1997. * readZero call, to avoid reconstructing it each time. This cache is cleared
  1998. * whenever an element changes.
  1999. */
  2000. struct hadoopRzOptions
  2001. {
  2002. JNIEnv *env;
  2003. int skipChecksums;
  2004. jobject byteBufferPool;
  2005. jobject cachedEnumSet;
  2006. };
  2007. struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
  2008. {
  2009. struct hadoopRzOptions *opts;
  2010. JNIEnv *env;
  2011. env = getJNIEnv();
  2012. if (!env) {
  2013. // Check to make sure the JNI environment is set up properly.
  2014. errno = EINTERNAL;
  2015. return NULL;
  2016. }
  2017. opts = calloc(1, sizeof(struct hadoopRzOptions));
  2018. if (!opts) {
  2019. errno = ENOMEM;
  2020. return NULL;
  2021. }
  2022. return opts;
  2023. }
  2024. static void hadoopRzOptionsClearCached(JNIEnv *env,
  2025. struct hadoopRzOptions *opts)
  2026. {
  2027. if (!opts->cachedEnumSet) {
  2028. return;
  2029. }
  2030. (*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
  2031. opts->cachedEnumSet = NULL;
  2032. }
  2033. int hadoopRzOptionsSetSkipChecksum(
  2034. struct hadoopRzOptions *opts, int skip)
  2035. {
  2036. JNIEnv *env;
  2037. env = getJNIEnv();
  2038. if (!env) {
  2039. errno = EINTERNAL;
  2040. return -1;
  2041. }
  2042. hadoopRzOptionsClearCached(env, opts);
  2043. opts->skipChecksums = !!skip;
  2044. return 0;
  2045. }
  2046. int hadoopRzOptionsSetByteBufferPool(
  2047. struct hadoopRzOptions *opts, const char *className)
  2048. {
  2049. JNIEnv *env;
  2050. jthrowable jthr;
  2051. jobject byteBufferPool = NULL;
  2052. env = getJNIEnv();
  2053. if (!env) {
  2054. errno = EINTERNAL;
  2055. return -1;
  2056. }
  2057. if (className) {
  2058. // Note: we don't have to call hadoopRzOptionsClearCached in this
  2059. // function, since the ByteBufferPool is passed separately from the
  2060. // EnumSet of ReadOptions.
  2061. jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
  2062. if (jthr) {
  2063. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2064. "hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
  2065. errno = EINVAL;
  2066. return -1;
  2067. }
  2068. }
  2069. if (opts->byteBufferPool) {
  2070. // Delete any previous ByteBufferPool we had.
  2071. (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
  2072. }
  2073. opts->byteBufferPool = byteBufferPool;
  2074. return 0;
  2075. }
  2076. void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
  2077. {
  2078. JNIEnv *env;
  2079. env = getJNIEnv();
  2080. if (!env) {
  2081. return;
  2082. }
  2083. hadoopRzOptionsClearCached(env, opts);
  2084. if (opts->byteBufferPool) {
  2085. (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
  2086. opts->byteBufferPool = NULL;
  2087. }
  2088. free(opts);
  2089. }
  2090. struct hadoopRzBuffer
  2091. {
  2092. jobject byteBuffer;
  2093. uint8_t *ptr;
  2094. int32_t length;
  2095. int direct;
  2096. };
  2097. static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
  2098. struct hadoopRzOptions *opts, jobject *enumSet)
  2099. {
  2100. jthrowable jthr = NULL;
  2101. jobject enumInst = NULL, enumSetObj = NULL;
  2102. jvalue jVal;
  2103. if (opts->cachedEnumSet) {
  2104. // If we cached the value, return it now.
  2105. *enumSet = opts->cachedEnumSet;
  2106. goto done;
  2107. }
  2108. if (opts->skipChecksums) {
  2109. jthr = fetchEnumInstance(env, READ_OPTION,
  2110. "SKIP_CHECKSUMS", &enumInst);
  2111. if (jthr) {
  2112. goto done;
  2113. }
  2114. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  2115. "java/util/EnumSet", "of",
  2116. "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
  2117. if (jthr) {
  2118. goto done;
  2119. }
  2120. enumSetObj = jVal.l;
  2121. } else {
  2122. jclass clazz = (*env)->FindClass(env, READ_OPTION);
  2123. if (!clazz) {
  2124. jthr = newRuntimeError(env, "failed "
  2125. "to find class for %s", READ_OPTION);
  2126. goto done;
  2127. }
  2128. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  2129. "java/util/EnumSet", "noneOf",
  2130. "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
  2131. enumSetObj = jVal.l;
  2132. }
  2133. // create global ref
  2134. opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
  2135. if (!opts->cachedEnumSet) {
  2136. jthr = getPendingExceptionAndClear(env);
  2137. goto done;
  2138. }
  2139. *enumSet = opts->cachedEnumSet;
  2140. jthr = NULL;
  2141. done:
  2142. (*env)->DeleteLocalRef(env, enumInst);
  2143. (*env)->DeleteLocalRef(env, enumSetObj);
  2144. return jthr;
  2145. }
  2146. static int hadoopReadZeroExtractBuffer(JNIEnv *env,
  2147. const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
  2148. {
  2149. int ret;
  2150. jthrowable jthr;
  2151. jvalue jVal;
  2152. uint8_t *directStart;
  2153. void *mallocBuf = NULL;
  2154. jint position;
  2155. jarray array = NULL;
  2156. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  2157. "java/nio/ByteBuffer", "remaining", "()I");
  2158. if (jthr) {
  2159. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2160. "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
  2161. goto done;
  2162. }
  2163. buffer->length = jVal.i;
  2164. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  2165. "java/nio/ByteBuffer", "position", "()I");
  2166. if (jthr) {
  2167. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2168. "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
  2169. goto done;
  2170. }
  2171. position = jVal.i;
  2172. directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
  2173. if (directStart) {
  2174. // Handle direct buffers.
  2175. buffer->ptr = directStart + position;
  2176. buffer->direct = 1;
  2177. ret = 0;
  2178. goto done;
  2179. }
  2180. // Handle indirect buffers.
  2181. // The JNI docs don't say that GetDirectBufferAddress throws any exceptions
  2182. // when it fails. However, they also don't clearly say that it doesn't. It
  2183. // seems safest to clear any pending exceptions here, to prevent problems on
  2184. // various JVMs.
  2185. (*env)->ExceptionClear(env);
  2186. if (!opts->byteBufferPool) {
  2187. fputs("hadoopReadZeroExtractBuffer: we read through the "
  2188. "zero-copy path, but failed to get the address of the buffer via "
  2189. "GetDirectBufferAddress. Please make sure your JVM supports "
  2190. "GetDirectBufferAddress.\n", stderr);
  2191. ret = ENOTSUP;
  2192. goto done;
  2193. }
  2194. // Get the backing array object of this buffer.
  2195. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  2196. "java/nio/ByteBuffer", "array", "()[B");
  2197. if (jthr) {
  2198. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2199. "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
  2200. goto done;
  2201. }
  2202. array = jVal.l;
  2203. if (!array) {
  2204. fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
  2205. stderr);
  2206. ret = EIO;
  2207. goto done;
  2208. }
  2209. mallocBuf = malloc(buffer->length);
  2210. if (!mallocBuf) {
  2211. fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
  2212. buffer->length);
  2213. ret = ENOMEM;
  2214. goto done;
  2215. }
  2216. (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
  2217. jthr = (*env)->ExceptionOccurred(env);
  2218. if (jthr) {
  2219. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2220. "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
  2221. goto done;
  2222. }
  2223. buffer->ptr = mallocBuf;
  2224. buffer->direct = 0;
  2225. ret = 0;
  2226. done:
  2227. free(mallocBuf);
  2228. (*env)->DeleteLocalRef(env, array);
  2229. return ret;
  2230. }
  2231. static int translateZCRException(JNIEnv *env, jthrowable exc)
  2232. {
  2233. int ret;
  2234. char *className = NULL;
  2235. jthrowable jthr = classNameOfObject(exc, env, &className);
  2236. if (jthr) {
  2237. fputs("hadoopReadZero: failed to get class name of "
  2238. "exception from read().\n", stderr);
  2239. destroyLocalReference(env, exc);
  2240. destroyLocalReference(env, jthr);
  2241. ret = EIO;
  2242. goto done;
  2243. }
  2244. if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
  2245. ret = EPROTONOSUPPORT;
  2246. goto done;
  2247. }
  2248. ret = printExceptionAndFree(env, exc, PRINT_EXC_ALL,
  2249. "hadoopZeroCopyRead: ZeroCopyCursor#read failed");
  2250. done:
  2251. free(className);
  2252. return ret;
  2253. }
  2254. struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
  2255. struct hadoopRzOptions *opts, int32_t maxLength)
  2256. {
  2257. JNIEnv *env;
  2258. jthrowable jthr = NULL;
  2259. jvalue jVal;
  2260. jobject enumSet = NULL, byteBuffer = NULL;
  2261. struct hadoopRzBuffer* buffer = NULL;
  2262. int ret;
  2263. env = getJNIEnv();
  2264. if (!env) {
  2265. errno = EINTERNAL;
  2266. return NULL;
  2267. }
  2268. if (file->type != HDFS_STREAM_INPUT) {
  2269. fputs("Cannot read from a non-InputStream object!\n", stderr);
  2270. ret = EINVAL;
  2271. goto done;
  2272. }
  2273. buffer = calloc(1, sizeof(struct hadoopRzBuffer));
  2274. if (!buffer) {
  2275. ret = ENOMEM;
  2276. goto done;
  2277. }
  2278. jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
  2279. if (jthr) {
  2280. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2281. "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
  2282. goto done;
  2283. }
  2284. jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
  2285. "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
  2286. "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
  2287. if (jthr) {
  2288. ret = translateZCRException(env, jthr);
  2289. goto done;
  2290. }
  2291. byteBuffer = jVal.l;
  2292. if (!byteBuffer) {
  2293. buffer->byteBuffer = NULL;
  2294. buffer->length = 0;
  2295. buffer->ptr = NULL;
  2296. } else {
  2297. buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
  2298. if (!buffer->byteBuffer) {
  2299. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2300. "hadoopReadZero: failed to create global ref to ByteBuffer");
  2301. goto done;
  2302. }
  2303. ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
  2304. if (ret) {
  2305. goto done;
  2306. }
  2307. }
  2308. ret = 0;
  2309. done:
  2310. (*env)->DeleteLocalRef(env, byteBuffer);
  2311. if (ret) {
  2312. if (buffer) {
  2313. if (buffer->byteBuffer) {
  2314. (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
  2315. }
  2316. free(buffer);
  2317. }
  2318. errno = ret;
  2319. return NULL;
  2320. } else {
  2321. errno = 0;
  2322. }
  2323. return buffer;
  2324. }
  2325. int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
  2326. {
  2327. return buffer->length;
  2328. }
  2329. const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
  2330. {
  2331. return buffer->ptr;
  2332. }
  2333. void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
  2334. {
  2335. jvalue jVal;
  2336. jthrowable jthr;
  2337. JNIEnv* env;
  2338. env = getJNIEnv();
  2339. if (env == NULL) {
  2340. errno = EINTERNAL;
  2341. return;
  2342. }
  2343. if (buffer->byteBuffer) {
  2344. jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
  2345. HADOOP_ISTRM, "releaseBuffer",
  2346. "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
  2347. if (jthr) {
  2348. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2349. "hadoopRzBufferFree: releaseBuffer failed: ");
  2350. // even on error, we have to delete the reference.
  2351. }
  2352. (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
  2353. }
  2354. if (!buffer->direct) {
  2355. free(buffer->ptr);
  2356. }
  2357. memset(buffer, 0, sizeof(*buffer));
  2358. free(buffer);
  2359. }
  2360. char***
  2361. hdfsGetHosts(hdfsFS fs, const char *path, tOffset start, tOffset length)
  2362. {
  2363. // JAVA EQUIVALENT:
  2364. // fs.getFileBlockLoctions(new Path(path), start, length);
  2365. jobject jFS = (jobject)fs;
  2366. jthrowable jthr;
  2367. jobject jPath = NULL;
  2368. jobject jFileStatus = NULL;
  2369. jvalue jFSVal, jVal;
  2370. jobjectArray jBlockLocations = NULL, jFileBlockHosts = NULL;
  2371. jstring jHost = NULL;
  2372. char*** blockHosts = NULL;
  2373. int i, j, ret;
  2374. jsize jNumFileBlocks = 0;
  2375. jobject jFileBlock;
  2376. jsize jNumBlockHosts;
  2377. const char *hostName;
  2378. //Get the JNIEnv* corresponding to current thread
  2379. JNIEnv* env = getJNIEnv();
  2380. if (env == NULL) {
  2381. errno = EINTERNAL;
  2382. return NULL;
  2383. }
  2384. //Create an object of org.apache.hadoop.fs.Path
  2385. jthr = constructNewObjectOfPath(env, path, &jPath);
  2386. if (jthr) {
  2387. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2388. "hdfsGetHosts(path=%s): constructNewObjectOfPath", path);
  2389. goto done;
  2390. }
  2391. jthr = invokeMethod(env, &jFSVal, INSTANCE, jFS,
  2392. HADOOP_FS, "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)"
  2393. "Lorg/apache/hadoop/fs/FileStatus;", jPath);
  2394. if (jthr) {
  2395. ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND,
  2396. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2397. "FileSystem#getFileStatus", path, start, length);
  2398. destroyLocalReference(env, jPath);
  2399. goto done;
  2400. }
  2401. jFileStatus = jFSVal.l;
  2402. //org.apache.hadoop.fs.FileSystem#getFileBlockLocations
  2403. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  2404. HADOOP_FS, "getFileBlockLocations",
  2405. "(Lorg/apache/hadoop/fs/FileStatus;JJ)"
  2406. "[Lorg/apache/hadoop/fs/BlockLocation;",
  2407. jFileStatus, start, length);
  2408. if (jthr) {
  2409. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2410. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2411. "FileSystem#getFileBlockLocations", path, start, length);
  2412. goto done;
  2413. }
  2414. jBlockLocations = jVal.l;
  2415. //Figure out no of entries in jBlockLocations
  2416. //Allocate memory and add NULL at the end
  2417. jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations);
  2418. blockHosts = calloc(jNumFileBlocks + 1, sizeof(char**));
  2419. if (blockHosts == NULL) {
  2420. ret = ENOMEM;
  2421. goto done;
  2422. }
  2423. if (jNumFileBlocks == 0) {
  2424. ret = 0;
  2425. goto done;
  2426. }
  2427. //Now parse each block to get hostnames
  2428. for (i = 0; i < jNumFileBlocks; ++i) {
  2429. jFileBlock =
  2430. (*env)->GetObjectArrayElement(env, jBlockLocations, i);
  2431. if (!jFileBlock) {
  2432. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2433. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2434. "GetObjectArrayElement(%d)", path, start, length, i);
  2435. goto done;
  2436. }
  2437. jthr = invokeMethod(env, &jVal, INSTANCE, jFileBlock, HADOOP_BLK_LOC,
  2438. "getHosts", "()[Ljava/lang/String;");
  2439. if (jthr) {
  2440. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2441. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2442. "BlockLocation#getHosts", path, start, length);
  2443. goto done;
  2444. }
  2445. jFileBlockHosts = jVal.l;
  2446. if (!jFileBlockHosts) {
  2447. fprintf(stderr,
  2448. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  2449. "BlockLocation#getHosts returned NULL", path, start, length);
  2450. ret = EINTERNAL;
  2451. goto done;
  2452. }
  2453. //Figure out no of hosts in jFileBlockHosts, and allocate the memory
  2454. jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts);
  2455. blockHosts[i] = calloc(jNumBlockHosts + 1, sizeof(char*));
  2456. if (!blockHosts[i]) {
  2457. ret = ENOMEM;
  2458. goto done;
  2459. }
  2460. //Now parse each hostname
  2461. for (j = 0; j < jNumBlockHosts; ++j) {
  2462. jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j);
  2463. if (!jHost) {
  2464. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2465. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"): "
  2466. "NewByteArray", path, start, length);
  2467. goto done;
  2468. }
  2469. hostName =
  2470. (const char*)((*env)->GetStringUTFChars(env, jHost, NULL));
  2471. if (!hostName) {
  2472. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2473. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64", "
  2474. "j=%d out of %d): GetStringUTFChars",
  2475. path, start, length, j, jNumBlockHosts);
  2476. goto done;
  2477. }
  2478. blockHosts[i][j] = strdup(hostName);
  2479. (*env)->ReleaseStringUTFChars(env, jHost, hostName);
  2480. if (!blockHosts[i][j]) {
  2481. ret = ENOMEM;
  2482. goto done;
  2483. }
  2484. destroyLocalReference(env, jHost);
  2485. jHost = NULL;
  2486. }
  2487. destroyLocalReference(env, jFileBlockHosts);
  2488. jFileBlockHosts = NULL;
  2489. }
  2490. ret = 0;
  2491. done:
  2492. destroyLocalReference(env, jPath);
  2493. destroyLocalReference(env, jFileStatus);
  2494. destroyLocalReference(env, jBlockLocations);
  2495. destroyLocalReference(env, jFileBlockHosts);
  2496. destroyLocalReference(env, jHost);
  2497. if (ret) {
  2498. if (blockHosts) {
  2499. hdfsFreeHosts(blockHosts);
  2500. }
  2501. return NULL;
  2502. }
  2503. return blockHosts;
  2504. }
  2505. void hdfsFreeHosts(char ***blockHosts)
  2506. {
  2507. int i, j;
  2508. for (i=0; blockHosts[i]; i++) {
  2509. for (j=0; blockHosts[i][j]; j++) {
  2510. free(blockHosts[i][j]);
  2511. }
  2512. free(blockHosts[i]);
  2513. }
  2514. free(blockHosts);
  2515. }
  2516. tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
  2517. {
  2518. // JAVA EQUIVALENT:
  2519. // fs.getDefaultBlockSize();
  2520. jobject jFS = (jobject)fs;
  2521. jvalue jVal;
  2522. jthrowable jthr;
  2523. //Get the JNIEnv* corresponding to current thread
  2524. JNIEnv* env = getJNIEnv();
  2525. if (env == NULL) {
  2526. errno = EINTERNAL;
  2527. return -1;
  2528. }
  2529. //FileSystem#getDefaultBlockSize()
  2530. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2531. "getDefaultBlockSize", "()J");
  2532. if (jthr) {
  2533. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2534. "hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize");
  2535. return -1;
  2536. }
  2537. return jVal.j;
  2538. }
  2539. tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path)
  2540. {
  2541. // JAVA EQUIVALENT:
  2542. // fs.getDefaultBlockSize(path);
  2543. jthrowable jthr;
  2544. jobject jFS = (jobject)fs;
  2545. jobject jPath;
  2546. tOffset blockSize;
  2547. JNIEnv* env = getJNIEnv();
  2548. if (env == NULL) {
  2549. errno = EINTERNAL;
  2550. return -1;
  2551. }
  2552. jthr = constructNewObjectOfPath(env, path, &jPath);
  2553. if (jthr) {
  2554. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2555. "hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath",
  2556. path);
  2557. return -1;
  2558. }
  2559. jthr = getDefaultBlockSize(env, jFS, jPath, &blockSize);
  2560. (*env)->DeleteLocalRef(env, jPath);
  2561. if (jthr) {
  2562. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2563. "hdfsGetDefaultBlockSize(path=%s): "
  2564. "FileSystem#getDefaultBlockSize", path);
  2565. return -1;
  2566. }
  2567. return blockSize;
  2568. }
  2569. tOffset hdfsGetCapacity(hdfsFS fs)
  2570. {
  2571. // JAVA EQUIVALENT:
  2572. // FsStatus fss = fs.getStatus();
  2573. // return Fss.getCapacity();
  2574. jobject jFS = (jobject)fs;
  2575. jvalue jVal;
  2576. jthrowable jthr;
  2577. jobject fss;
  2578. //Get the JNIEnv* corresponding to current thread
  2579. JNIEnv* env = getJNIEnv();
  2580. if (env == NULL) {
  2581. errno = EINTERNAL;
  2582. return -1;
  2583. }
  2584. //FileSystem#getStatus
  2585. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2586. "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
  2587. if (jthr) {
  2588. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2589. "hdfsGetCapacity: FileSystem#getStatus");
  2590. return -1;
  2591. }
  2592. fss = (jobject)jVal.l;
  2593. jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS,
  2594. "getCapacity", "()J");
  2595. destroyLocalReference(env, fss);
  2596. if (jthr) {
  2597. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2598. "hdfsGetCapacity: FsStatus#getCapacity");
  2599. return -1;
  2600. }
  2601. return jVal.j;
  2602. }
  2603. tOffset hdfsGetUsed(hdfsFS fs)
  2604. {
  2605. // JAVA EQUIVALENT:
  2606. // FsStatus fss = fs.getStatus();
  2607. // return Fss.getUsed();
  2608. jobject jFS = (jobject)fs;
  2609. jvalue jVal;
  2610. jthrowable jthr;
  2611. jobject fss;
  2612. //Get the JNIEnv* corresponding to current thread
  2613. JNIEnv* env = getJNIEnv();
  2614. if (env == NULL) {
  2615. errno = EINTERNAL;
  2616. return -1;
  2617. }
  2618. //FileSystem#getStatus
  2619. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2620. "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
  2621. if (jthr) {
  2622. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2623. "hdfsGetUsed: FileSystem#getStatus");
  2624. return -1;
  2625. }
  2626. fss = (jobject)jVal.l;
  2627. jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS,
  2628. "getUsed", "()J");
  2629. destroyLocalReference(env, fss);
  2630. if (jthr) {
  2631. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2632. "hdfsGetUsed: FsStatus#getUsed");
  2633. return -1;
  2634. }
  2635. return jVal.j;
  2636. }
  2637. /**
  2638. * We cannot add new fields to the hdfsFileInfo structure because it would break
  2639. * binary compatibility. The reason is because we return an array
  2640. * of hdfsFileInfo structures from hdfsListDirectory. So changing the size of
  2641. * those structures would break all programs that relied on finding the second
  2642. * element in the array at <base_offset> + sizeof(struct hdfsFileInfo).
  2643. *
  2644. * So instead, we add the new fields to the hdfsExtendedFileInfo structure.
  2645. * This structure is contained in the mOwner string found inside the
  2646. * hdfsFileInfo. Specifically, the format of mOwner is:
  2647. *
  2648. * [owner-string] [null byte] [padding] [hdfsExtendedFileInfo structure]
  2649. *
  2650. * The padding is added so that the hdfsExtendedFileInfo structure starts on an
  2651. * 8-byte boundary.
  2652. *
  2653. * @param str The string to locate the extended info in.
  2654. * @return The offset of the hdfsExtendedFileInfo structure.
  2655. */
  2656. static size_t getExtendedFileInfoOffset(const char *str)
  2657. {
  2658. int num_64_bit_words = ((strlen(str) + 1) + 7) / 8;
  2659. return num_64_bit_words * 8;
  2660. }
  2661. static struct hdfsExtendedFileInfo *getExtendedFileInfo(hdfsFileInfo *fileInfo)
  2662. {
  2663. char *owner = fileInfo->mOwner;
  2664. return (struct hdfsExtendedFileInfo *)(owner +
  2665. getExtendedFileInfoOffset(owner));
  2666. }
  2667. static jthrowable
  2668. getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo)
  2669. {
  2670. jvalue jVal;
  2671. jthrowable jthr;
  2672. jobject jPath = NULL;
  2673. jstring jPathName = NULL;
  2674. jstring jUserName = NULL;
  2675. jstring jGroupName = NULL;
  2676. jobject jPermission = NULL;
  2677. const char *cPathName;
  2678. const char *cUserName;
  2679. const char *cGroupName;
  2680. struct hdfsExtendedFileInfo *extInfo;
  2681. size_t extOffset;
  2682. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2683. HADOOP_STAT, "isDir", "()Z");
  2684. if (jthr)
  2685. goto done;
  2686. fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile;
  2687. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2688. HADOOP_STAT, "getReplication", "()S");
  2689. if (jthr)
  2690. goto done;
  2691. fileInfo->mReplication = jVal.s;
  2692. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2693. HADOOP_STAT, "getBlockSize", "()J");
  2694. if (jthr)
  2695. goto done;
  2696. fileInfo->mBlockSize = jVal.j;
  2697. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2698. HADOOP_STAT, "getModificationTime", "()J");
  2699. if (jthr)
  2700. goto done;
  2701. fileInfo->mLastMod = jVal.j / 1000;
  2702. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2703. HADOOP_STAT, "getAccessTime", "()J");
  2704. if (jthr)
  2705. goto done;
  2706. fileInfo->mLastAccess = (tTime) (jVal.j / 1000);
  2707. if (fileInfo->mKind == kObjectKindFile) {
  2708. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2709. HADOOP_STAT, "getLen", "()J");
  2710. if (jthr)
  2711. goto done;
  2712. fileInfo->mSize = jVal.j;
  2713. }
  2714. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2715. "getPath", "()Lorg/apache/hadoop/fs/Path;");
  2716. if (jthr)
  2717. goto done;
  2718. jPath = jVal.l;
  2719. if (jPath == NULL) {
  2720. jthr = newRuntimeError(env, "org.apache.hadoop.fs.FileStatus#"
  2721. "getPath returned NULL!");
  2722. goto done;
  2723. }
  2724. jthr = invokeMethod(env, &jVal, INSTANCE, jPath, HADOOP_PATH,
  2725. "toString", "()Ljava/lang/String;");
  2726. if (jthr)
  2727. goto done;
  2728. jPathName = jVal.l;
  2729. cPathName =
  2730. (const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL));
  2731. if (!cPathName) {
  2732. jthr = getPendingExceptionAndClear(env);
  2733. goto done;
  2734. }
  2735. fileInfo->mName = strdup(cPathName);
  2736. (*env)->ReleaseStringUTFChars(env, jPathName, cPathName);
  2737. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2738. "getOwner", "()Ljava/lang/String;");
  2739. if (jthr)
  2740. goto done;
  2741. jUserName = jVal.l;
  2742. cUserName =
  2743. (const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL));
  2744. if (!cUserName) {
  2745. jthr = getPendingExceptionAndClear(env);
  2746. goto done;
  2747. }
  2748. extOffset = getExtendedFileInfoOffset(cUserName);
  2749. fileInfo->mOwner = malloc(extOffset + sizeof(struct hdfsExtendedFileInfo));
  2750. if (!fileInfo->mOwner) {
  2751. jthr = newRuntimeError(env, "getFileInfo: OOM allocating mOwner");
  2752. goto done;
  2753. }
  2754. strcpy(fileInfo->mOwner, cUserName);
  2755. (*env)->ReleaseStringUTFChars(env, jUserName, cUserName);
  2756. extInfo = getExtendedFileInfo(fileInfo);
  2757. memset(extInfo, 0, sizeof(*extInfo));
  2758. jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
  2759. HADOOP_STAT, "isEncrypted", "()Z");
  2760. if (jthr) {
  2761. goto done;
  2762. }
  2763. if (jVal.z == JNI_TRUE) {
  2764. extInfo->flags |= HDFS_EXTENDED_FILE_INFO_ENCRYPTED;
  2765. }
  2766. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2767. "getGroup", "()Ljava/lang/String;");
  2768. if (jthr)
  2769. goto done;
  2770. jGroupName = jVal.l;
  2771. cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL));
  2772. if (!cGroupName) {
  2773. jthr = getPendingExceptionAndClear(env);
  2774. goto done;
  2775. }
  2776. fileInfo->mGroup = strdup(cGroupName);
  2777. (*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName);
  2778. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
  2779. "getPermission",
  2780. "()Lorg/apache/hadoop/fs/permission/FsPermission;");
  2781. if (jthr)
  2782. goto done;
  2783. if (jVal.l == NULL) {
  2784. jthr = newRuntimeError(env, "%s#getPermission returned NULL!",
  2785. HADOOP_STAT);
  2786. goto done;
  2787. }
  2788. jPermission = jVal.l;
  2789. jthr = invokeMethod(env, &jVal, INSTANCE, jPermission, HADOOP_FSPERM,
  2790. "toShort", "()S");
  2791. if (jthr)
  2792. goto done;
  2793. fileInfo->mPermissions = jVal.s;
  2794. jthr = NULL;
  2795. done:
  2796. if (jthr)
  2797. hdfsFreeFileInfoEntry(fileInfo);
  2798. destroyLocalReference(env, jPath);
  2799. destroyLocalReference(env, jPathName);
  2800. destroyLocalReference(env, jUserName);
  2801. destroyLocalReference(env, jGroupName);
  2802. destroyLocalReference(env, jPermission);
  2803. destroyLocalReference(env, jPath);
  2804. return jthr;
  2805. }
  2806. static jthrowable
  2807. getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo **fileInfo)
  2808. {
  2809. // JAVA EQUIVALENT:
  2810. // fs.isDirectory(f)
  2811. // fs.getModificationTime()
  2812. // fs.getAccessTime()
  2813. // fs.getLength(f)
  2814. // f.getPath()
  2815. // f.getOwner()
  2816. // f.getGroup()
  2817. // f.getPermission().toShort()
  2818. jobject jStat;
  2819. jvalue jVal;
  2820. jthrowable jthr;
  2821. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
  2822. "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"),
  2823. jPath);
  2824. if (jthr)
  2825. return jthr;
  2826. if (jVal.z == 0) {
  2827. *fileInfo = NULL;
  2828. return NULL;
  2829. }
  2830. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  2831. HADOOP_FS, "getFileStatus",
  2832. JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_STAT)), jPath);
  2833. if (jthr)
  2834. return jthr;
  2835. jStat = jVal.l;
  2836. *fileInfo = calloc(1, sizeof(hdfsFileInfo));
  2837. if (!*fileInfo) {
  2838. destroyLocalReference(env, jStat);
  2839. return newRuntimeError(env, "getFileInfo: OOM allocating hdfsFileInfo");
  2840. }
  2841. jthr = getFileInfoFromStat(env, jStat, *fileInfo);
  2842. destroyLocalReference(env, jStat);
  2843. return jthr;
  2844. }
  2845. hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char *path, int *numEntries)
  2846. {
  2847. // JAVA EQUIVALENT:
  2848. // Path p(path);
  2849. // Path []pathList = fs.listPaths(p)
  2850. // foreach path in pathList
  2851. // getFileInfo(path)
  2852. jobject jFS = (jobject)fs;
  2853. jthrowable jthr;
  2854. jobject jPath = NULL;
  2855. hdfsFileInfo *pathList = NULL;
  2856. jobjectArray jPathList = NULL;
  2857. jvalue jVal;
  2858. jsize jPathListSize = 0;
  2859. int ret;
  2860. jsize i;
  2861. jobject tmpStat;
  2862. //Get the JNIEnv* corresponding to current thread
  2863. JNIEnv* env = getJNIEnv();
  2864. if (env == NULL) {
  2865. errno = EINTERNAL;
  2866. return NULL;
  2867. }
  2868. //Create an object of org.apache.hadoop.fs.Path
  2869. jthr = constructNewObjectOfPath(env, path, &jPath);
  2870. if (jthr) {
  2871. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2872. "hdfsListDirectory(%s): constructNewObjectOfPath", path);
  2873. goto done;
  2874. }
  2875. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_DFS, "listStatus",
  2876. JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_STAT)),
  2877. jPath);
  2878. if (jthr) {
  2879. ret = printExceptionAndFree(env, jthr,
  2880. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2881. NOPRINT_EXC_UNRESOLVED_LINK,
  2882. "hdfsListDirectory(%s): FileSystem#listStatus", path);
  2883. goto done;
  2884. }
  2885. jPathList = jVal.l;
  2886. //Figure out the number of entries in that directory
  2887. jPathListSize = (*env)->GetArrayLength(env, jPathList);
  2888. if (jPathListSize == 0) {
  2889. ret = 0;
  2890. goto done;
  2891. }
  2892. //Allocate memory
  2893. pathList = calloc(jPathListSize, sizeof(hdfsFileInfo));
  2894. if (pathList == NULL) {
  2895. ret = ENOMEM;
  2896. goto done;
  2897. }
  2898. //Save path information in pathList
  2899. for (i=0; i < jPathListSize; ++i) {
  2900. tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i);
  2901. if (!tmpStat) {
  2902. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2903. "hdfsListDirectory(%s): GetObjectArrayElement(%d out of %d)",
  2904. path, i, jPathListSize);
  2905. goto done;
  2906. }
  2907. jthr = getFileInfoFromStat(env, tmpStat, &pathList[i]);
  2908. destroyLocalReference(env, tmpStat);
  2909. if (jthr) {
  2910. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2911. "hdfsListDirectory(%s): getFileInfoFromStat(%d out of %d)",
  2912. path, i, jPathListSize);
  2913. goto done;
  2914. }
  2915. }
  2916. ret = 0;
  2917. done:
  2918. destroyLocalReference(env, jPath);
  2919. destroyLocalReference(env, jPathList);
  2920. if (ret) {
  2921. hdfsFreeFileInfo(pathList, jPathListSize);
  2922. errno = ret;
  2923. return NULL;
  2924. }
  2925. *numEntries = jPathListSize;
  2926. errno = 0;
  2927. return pathList;
  2928. }
  2929. hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char *path)
  2930. {
  2931. // JAVA EQUIVALENT:
  2932. // File f(path);
  2933. // fs.isDirectory(f)
  2934. // fs.lastModified() ??
  2935. // fs.getLength(f)
  2936. // f.getPath()
  2937. jobject jFS = (jobject)fs;
  2938. jobject jPath;
  2939. jthrowable jthr;
  2940. hdfsFileInfo *fileInfo;
  2941. //Get the JNIEnv* corresponding to current thread
  2942. JNIEnv* env = getJNIEnv();
  2943. if (env == NULL) {
  2944. errno = EINTERNAL;
  2945. return NULL;
  2946. }
  2947. //Create an object of org.apache.hadoop.fs.Path
  2948. jthr = constructNewObjectOfPath(env, path, &jPath);
  2949. if (jthr) {
  2950. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2951. "hdfsGetPathInfo(%s): constructNewObjectOfPath", path);
  2952. return NULL;
  2953. }
  2954. jthr = getFileInfo(env, jFS, jPath, &fileInfo);
  2955. destroyLocalReference(env, jPath);
  2956. if (jthr) {
  2957. errno = printExceptionAndFree(env, jthr,
  2958. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2959. NOPRINT_EXC_UNRESOLVED_LINK,
  2960. "hdfsGetPathInfo(%s): getFileInfo", path);
  2961. return NULL;
  2962. }
  2963. if (!fileInfo) {
  2964. errno = ENOENT;
  2965. return NULL;
  2966. }
  2967. return fileInfo;
  2968. }
  2969. static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo)
  2970. {
  2971. free(hdfsFileInfo->mName);
  2972. free(hdfsFileInfo->mOwner);
  2973. free(hdfsFileInfo->mGroup);
  2974. memset(hdfsFileInfo, 0, sizeof(*hdfsFileInfo));
  2975. }
  2976. void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
  2977. {
  2978. //Free the mName, mOwner, and mGroup
  2979. int i;
  2980. for (i=0; i < numEntries; ++i) {
  2981. hdfsFreeFileInfoEntry(hdfsFileInfo + i);
  2982. }
  2983. //Free entire block
  2984. free(hdfsFileInfo);
  2985. }
  2986. int hdfsFileIsEncrypted(hdfsFileInfo *fileInfo)
  2987. {
  2988. struct hdfsExtendedFileInfo *extInfo;
  2989. extInfo = getExtendedFileInfo(fileInfo);
  2990. return !!(extInfo->flags & HDFS_EXTENDED_FILE_INFO_ENCRYPTED);
  2991. }
  2992. /**
  2993. * vim: ts=4: sw=4: et:
  2994. */