hdfs.c 121 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "exception.h"
  19. #include "hdfs/hdfs.h"
  20. #include "jclasses.h"
  21. #include "jni_helper.h"
  22. #include "platform.h"
  23. #include <fcntl.h>
  24. #include <inttypes.h>
  25. #include <stdio.h>
  26. #include <string.h>
  27. #define JAVA_VOID "V"
  28. /* Macros for constructing method signatures */
  29. #define JPARAM(X) "L" X ";"
  30. #define JARRPARAM(X) "[L" X ";"
  31. #define JMETHOD1(X, R) "(" X ")" R
  32. #define JMETHOD2(X, Y, R) "(" X Y ")" R
  33. #define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
  34. #define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
  35. // StreamCapability flags taken from o.a.h.fs.StreamCapabilities
  36. #define IS_READ_BYTE_BUFFER_CAPABILITY "in:readbytebuffer"
  37. #define IS_PREAD_BYTE_BUFFER_CAPABILITY "in:preadbytebuffer"
  38. // Bit fields for hdfsFile_internal flags
  39. #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
  40. #define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1)
  41. /**
  42. * Reads bytes using the read(ByteBuffer) API. By using Java
  43. * DirectByteBuffers we can avoid copying the bytes onto the Java heap.
  44. * Instead the data will be directly copied from kernel space to the C heap.
  45. */
  46. tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
  47. /**
  48. * Reads bytes using the read(long, ByteBuffer) API. By using Java
  49. * DirectByteBuffers we can avoid copying the bytes onto the Java heap.
  50. * Instead the data will be directly copied from kernel space to the C heap.
  51. */
  52. tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
  53. tSize length);
  54. int preadFullyDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
  55. tSize length);
  56. static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
  57. /**
  58. * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
  59. */
  60. enum hdfsStreamType
  61. {
  62. HDFS_STREAM_UNINITIALIZED = 0,
  63. HDFS_STREAM_INPUT = 1,
  64. HDFS_STREAM_OUTPUT = 2,
  65. };
  66. /**
  67. * The 'file-handle' to a file in hdfs.
  68. */
  69. struct hdfsFile_internal {
  70. void* file;
  71. enum hdfsStreamType type;
  72. int flags;
  73. };
  74. #define HDFS_EXTENDED_FILE_INFO_ENCRYPTED 0x1
  75. /**
  76. * Extended file information.
  77. */
  78. struct hdfsExtendedFileInfo {
  79. int flags;
  80. };
  81. int hdfsFileIsOpenForRead(hdfsFile file)
  82. {
  83. return (file->type == HDFS_STREAM_INPUT);
  84. }
  85. int hdfsGetHedgedReadMetrics(hdfsFS fs, struct hdfsHedgedReadMetrics **metrics)
  86. {
  87. jthrowable jthr;
  88. jobject hedgedReadMetrics = NULL;
  89. jvalue jVal;
  90. struct hdfsHedgedReadMetrics *m = NULL;
  91. int ret;
  92. jobject jFS = (jobject)fs;
  93. JNIEnv* env = getJNIEnv();
  94. if (env == NULL) {
  95. errno = EINTERNAL;
  96. return -1;
  97. }
  98. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  99. JC_DISTRIBUTED_FILE_SYSTEM, "getHedgedReadMetrics",
  100. "()Lorg/apache/hadoop/hdfs/DFSHedgedReadMetrics;");
  101. if (jthr) {
  102. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  103. "hdfsGetHedgedReadMetrics: getHedgedReadMetrics failed");
  104. goto done;
  105. }
  106. hedgedReadMetrics = jVal.l;
  107. m = malloc(sizeof(struct hdfsHedgedReadMetrics));
  108. if (!m) {
  109. ret = ENOMEM;
  110. goto done;
  111. }
  112. jthr = invokeMethod(env, &jVal, INSTANCE, hedgedReadMetrics,
  113. JC_DFS_HEDGED_READ_METRICS, "getHedgedReadOps", "()J");
  114. if (jthr) {
  115. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  116. "hdfsGetHedgedReadStatistics: getHedgedReadOps failed");
  117. goto done;
  118. }
  119. m->hedgedReadOps = jVal.j;
  120. jthr = invokeMethod(env, &jVal, INSTANCE, hedgedReadMetrics,
  121. JC_DFS_HEDGED_READ_METRICS, "getHedgedReadWins", "()J");
  122. if (jthr) {
  123. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  124. "hdfsGetHedgedReadStatistics: getHedgedReadWins failed");
  125. goto done;
  126. }
  127. m->hedgedReadOpsWin = jVal.j;
  128. jthr = invokeMethod(env, &jVal, INSTANCE, hedgedReadMetrics,
  129. JC_DFS_HEDGED_READ_METRICS, "getHedgedReadOpsInCurThread", "()J");
  130. if (jthr) {
  131. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  132. "hdfsGetHedgedReadStatistics: getHedgedReadOpsInCurThread failed");
  133. goto done;
  134. }
  135. m->hedgedReadOpsInCurThread = jVal.j;
  136. *metrics = m;
  137. m = NULL;
  138. ret = 0;
  139. done:
  140. destroyLocalReference(env, hedgedReadMetrics);
  141. free(m);
  142. if (ret) {
  143. errno = ret;
  144. return -1;
  145. }
  146. return 0;
  147. }
  148. void hdfsFreeHedgedReadMetrics(struct hdfsHedgedReadMetrics *metrics)
  149. {
  150. free(metrics);
  151. }
  152. int hdfsFileGetReadStatistics(hdfsFile file,
  153. struct hdfsReadStatistics **stats)
  154. {
  155. jthrowable jthr;
  156. jobject readStats = NULL;
  157. jvalue jVal;
  158. struct hdfsReadStatistics *s = NULL;
  159. int ret;
  160. JNIEnv* env = getJNIEnv();
  161. if (env == NULL) {
  162. errno = EINTERNAL;
  163. return -1;
  164. }
  165. if (file->type != HDFS_STREAM_INPUT) {
  166. ret = EINVAL;
  167. goto done;
  168. }
  169. jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
  170. JC_HDFS_DATA_INPUT_STREAM, "getReadStatistics",
  171. "()Lorg/apache/hadoop/hdfs/ReadStatistics;");
  172. if (jthr) {
  173. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  174. "hdfsFileGetReadStatistics: getReadStatistics failed");
  175. goto done;
  176. }
  177. readStats = jVal.l;
  178. s = malloc(sizeof(struct hdfsReadStatistics));
  179. if (!s) {
  180. ret = ENOMEM;
  181. goto done;
  182. }
  183. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  184. JC_READ_STATISTICS, "getTotalBytesRead", "()J");
  185. if (jthr) {
  186. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  187. "hdfsFileGetReadStatistics: getTotalBytesRead failed");
  188. goto done;
  189. }
  190. s->totalBytesRead = jVal.j;
  191. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  192. JC_READ_STATISTICS, "getTotalLocalBytesRead", "()J");
  193. if (jthr) {
  194. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  195. "hdfsFileGetReadStatistics: getTotalLocalBytesRead failed");
  196. goto done;
  197. }
  198. s->totalLocalBytesRead = jVal.j;
  199. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  200. JC_READ_STATISTICS, "getTotalShortCircuitBytesRead",
  201. "()J");
  202. if (jthr) {
  203. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  204. "hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed");
  205. goto done;
  206. }
  207. s->totalShortCircuitBytesRead = jVal.j;
  208. jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
  209. JC_READ_STATISTICS, "getTotalZeroCopyBytesRead",
  210. "()J");
  211. if (jthr) {
  212. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  213. "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
  214. goto done;
  215. }
  216. s->totalZeroCopyBytesRead = jVal.j;
  217. *stats = s;
  218. s = NULL;
  219. ret = 0;
  220. done:
  221. destroyLocalReference(env, readStats);
  222. free(s);
  223. if (ret) {
  224. errno = ret;
  225. return -1;
  226. }
  227. return 0;
  228. }
  229. int64_t hdfsReadStatisticsGetRemoteBytesRead(
  230. const struct hdfsReadStatistics *stats)
  231. {
  232. return stats->totalBytesRead - stats->totalLocalBytesRead;
  233. }
  234. int hdfsFileClearReadStatistics(hdfsFile file)
  235. {
  236. jthrowable jthr;
  237. int ret;
  238. JNIEnv* env = getJNIEnv();
  239. if (env == NULL) {
  240. errno = EINTERNAL;
  241. return EINTERNAL;
  242. }
  243. if (file->type != HDFS_STREAM_INPUT) {
  244. ret = EINVAL;
  245. goto done;
  246. }
  247. jthr = invokeMethod(env, NULL, INSTANCE, file->file,
  248. JC_HDFS_DATA_INPUT_STREAM, "clearReadStatistics",
  249. "()V");
  250. if (jthr) {
  251. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  252. "hdfsFileClearReadStatistics: clearReadStatistics failed");
  253. goto done;
  254. }
  255. ret = 0;
  256. done:
  257. if (ret) {
  258. errno = ret;
  259. return ret;
  260. }
  261. return 0;
  262. }
  263. void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
  264. {
  265. free(stats);
  266. }
  267. int hdfsFileIsOpenForWrite(hdfsFile file)
  268. {
  269. return (file->type == HDFS_STREAM_OUTPUT);
  270. }
  271. int hdfsFileUsesDirectRead(hdfsFile file)
  272. {
  273. return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) != 0;
  274. }
  275. void hdfsFileDisableDirectRead(hdfsFile file)
  276. {
  277. file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
  278. }
  279. int hdfsFileUsesDirectPread(hdfsFile file)
  280. {
  281. return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) != 0;
  282. }
  283. void hdfsFileDisableDirectPread(hdfsFile file)
  284. {
  285. file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_PREAD;
  286. }
  287. int hdfsDisableDomainSocketSecurity(void)
  288. {
  289. jthrowable jthr;
  290. JNIEnv* env = getJNIEnv();
  291. if (env == NULL) {
  292. errno = EINTERNAL;
  293. return -1;
  294. }
  295. jthr = invokeMethod(env, NULL, STATIC, NULL, JC_DOMAIN_SOCKET,
  296. "disableBindPathValidation", "()V");
  297. if (jthr) {
  298. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  299. "DomainSocket#disableBindPathValidation");
  300. return -1;
  301. }
  302. return 0;
  303. }
  304. /**
  305. * hdfsJniEnv: A wrapper struct to be used as 'value'
  306. * while saving thread -> JNIEnv* mappings
  307. */
  308. typedef struct
  309. {
  310. JNIEnv* env;
  311. } hdfsJniEnv;
  312. /**
  313. * Helper function to create a org.apache.hadoop.fs.Path object.
  314. * @param env: The JNIEnv pointer.
  315. * @param path: The file-path for which to construct org.apache.hadoop.fs.Path
  316. * object.
  317. * @return Returns a jobject on success and NULL on error.
  318. */
  319. static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
  320. jobject *out)
  321. {
  322. jthrowable jthr;
  323. jstring jPathString;
  324. jobject jPath;
  325. //Construct a java.lang.String object
  326. jthr = newJavaStr(env, path, &jPathString);
  327. if (jthr)
  328. return jthr;
  329. //Construct the org.apache.hadoop.fs.Path object
  330. jthr = constructNewObjectOfCachedClass(env, &jPath, JC_PATH,
  331. "(Ljava/lang/String;)V", jPathString);
  332. destroyLocalReference(env, jPathString);
  333. if (jthr)
  334. return jthr;
  335. *out = jPath;
  336. return NULL;
  337. }
  338. static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
  339. const char *key, char **val)
  340. {
  341. jthrowable jthr;
  342. jvalue jVal;
  343. jstring jkey = NULL, jRet = NULL;
  344. jthr = newJavaStr(env, key, &jkey);
  345. if (jthr)
  346. goto done;
  347. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  348. JC_CONFIGURATION, "get", JMETHOD1(JPARAM(JAVA_STRING),
  349. JPARAM(JAVA_STRING)), jkey);
  350. if (jthr)
  351. goto done;
  352. jRet = jVal.l;
  353. jthr = newCStr(env, jRet, val);
  354. done:
  355. destroyLocalReference(env, jkey);
  356. destroyLocalReference(env, jRet);
  357. return jthr;
  358. }
  359. int hdfsConfGetStr(const char *key, char **val)
  360. {
  361. JNIEnv *env;
  362. int ret;
  363. jthrowable jthr;
  364. jobject jConfiguration = NULL;
  365. env = getJNIEnv();
  366. if (env == NULL) {
  367. ret = EINTERNAL;
  368. goto done;
  369. }
  370. jthr = constructNewObjectOfCachedClass(env, &jConfiguration,
  371. JC_CONFIGURATION, "()V");
  372. if (jthr) {
  373. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  374. "hdfsConfGetStr(%s): new Configuration", key);
  375. goto done;
  376. }
  377. jthr = hadoopConfGetStr(env, jConfiguration, key, val);
  378. if (jthr) {
  379. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  380. "hdfsConfGetStr(%s): hadoopConfGetStr", key);
  381. goto done;
  382. }
  383. ret = 0;
  384. done:
  385. destroyLocalReference(env, jConfiguration);
  386. if (ret)
  387. errno = ret;
  388. return ret;
  389. }
  390. void hdfsConfStrFree(char *val)
  391. {
  392. free(val);
  393. }
  394. static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
  395. const char *key, int32_t *val)
  396. {
  397. jthrowable jthr = NULL;
  398. jvalue jVal;
  399. jstring jkey = NULL;
  400. jthr = newJavaStr(env, key, &jkey);
  401. if (jthr)
  402. return jthr;
  403. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  404. JC_CONFIGURATION, "getInt",
  405. JMETHOD2(JPARAM(JAVA_STRING), "I", "I"), jkey, (jint)(*val));
  406. destroyLocalReference(env, jkey);
  407. if (jthr)
  408. return jthr;
  409. *val = jVal.i;
  410. return NULL;
  411. }
  412. int hdfsConfGetInt(const char *key, int32_t *val)
  413. {
  414. JNIEnv *env;
  415. int ret;
  416. jobject jConfiguration = NULL;
  417. jthrowable jthr;
  418. env = getJNIEnv();
  419. if (env == NULL) {
  420. ret = EINTERNAL;
  421. goto done;
  422. }
  423. jthr = constructNewObjectOfCachedClass(env, &jConfiguration,
  424. JC_CONFIGURATION, "()V");
  425. if (jthr) {
  426. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  427. "hdfsConfGetInt(%s): new Configuration", key);
  428. goto done;
  429. }
  430. jthr = hadoopConfGetInt(env, jConfiguration, key, val);
  431. if (jthr) {
  432. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  433. "hdfsConfGetInt(%s): hadoopConfGetInt", key);
  434. goto done;
  435. }
  436. ret = 0;
  437. done:
  438. destroyLocalReference(env, jConfiguration);
  439. if (ret)
  440. errno = ret;
  441. return ret;
  442. }
  443. struct hdfsBuilderConfOpt {
  444. struct hdfsBuilderConfOpt *next;
  445. const char *key;
  446. const char *val;
  447. };
  448. struct hdfsBuilder {
  449. int forceNewInstance;
  450. const char *nn;
  451. tPort port;
  452. const char *kerbTicketCachePath;
  453. const char *userName;
  454. struct hdfsBuilderConfOpt *opts;
  455. };
  456. struct hdfsBuilder *hdfsNewBuilder(void)
  457. {
  458. struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
  459. if (!bld) {
  460. errno = ENOMEM;
  461. return NULL;
  462. }
  463. return bld;
  464. }
  465. int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
  466. const char *val)
  467. {
  468. struct hdfsBuilderConfOpt *opt, *next;
  469. opt = calloc(1, sizeof(struct hdfsBuilderConfOpt));
  470. if (!opt)
  471. return -ENOMEM;
  472. next = bld->opts;
  473. bld->opts = opt;
  474. opt->next = next;
  475. opt->key = key;
  476. opt->val = val;
  477. return 0;
  478. }
  479. void hdfsFreeBuilder(struct hdfsBuilder *bld)
  480. {
  481. struct hdfsBuilderConfOpt *cur, *next;
  482. cur = bld->opts;
  483. for (cur = bld->opts; cur; ) {
  484. next = cur->next;
  485. free(cur);
  486. cur = next;
  487. }
  488. free(bld);
  489. }
  490. void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
  491. {
  492. bld->forceNewInstance = 1;
  493. }
  494. void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
  495. {
  496. bld->nn = nn;
  497. }
  498. void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
  499. {
  500. bld->port = port;
  501. }
  502. void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
  503. {
  504. bld->userName = userName;
  505. }
  506. void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
  507. const char *kerbTicketCachePath)
  508. {
  509. bld->kerbTicketCachePath = kerbTicketCachePath;
  510. }
  511. hdfsFS hdfsConnect(const char *host, tPort port)
  512. {
  513. struct hdfsBuilder *bld = hdfsNewBuilder();
  514. if (!bld)
  515. return NULL;
  516. hdfsBuilderSetNameNode(bld, host);
  517. hdfsBuilderSetNameNodePort(bld, port);
  518. return hdfsBuilderConnect(bld);
  519. }
  520. /** Always return a new FileSystem handle */
  521. hdfsFS hdfsConnectNewInstance(const char *host, tPort port)
  522. {
  523. struct hdfsBuilder *bld = hdfsNewBuilder();
  524. if (!bld)
  525. return NULL;
  526. hdfsBuilderSetNameNode(bld, host);
  527. hdfsBuilderSetNameNodePort(bld, port);
  528. hdfsBuilderSetForceNewInstance(bld);
  529. return hdfsBuilderConnect(bld);
  530. }
  531. hdfsFS hdfsConnectAsUser(const char *host, tPort port, const char *user)
  532. {
  533. struct hdfsBuilder *bld = hdfsNewBuilder();
  534. if (!bld)
  535. return NULL;
  536. hdfsBuilderSetNameNode(bld, host);
  537. hdfsBuilderSetNameNodePort(bld, port);
  538. hdfsBuilderSetUserName(bld, user);
  539. return hdfsBuilderConnect(bld);
  540. }
  541. /** Always return a new FileSystem handle */
  542. hdfsFS hdfsConnectAsUserNewInstance(const char *host, tPort port,
  543. const char *user)
  544. {
  545. struct hdfsBuilder *bld = hdfsNewBuilder();
  546. if (!bld)
  547. return NULL;
  548. hdfsBuilderSetNameNode(bld, host);
  549. hdfsBuilderSetNameNodePort(bld, port);
  550. hdfsBuilderSetForceNewInstance(bld);
  551. hdfsBuilderSetUserName(bld, user);
  552. return hdfsBuilderConnect(bld);
  553. }
  554. /**
  555. * Calculate the effective URI to use, given a builder configuration.
  556. *
  557. * If there is not already a URI scheme, we prepend 'hdfs://'.
  558. *
  559. * If there is not already a port specified, and a port was given to the
  560. * builder, we suffix that port. If there is a port specified but also one in
  561. * the URI, that is an error.
  562. *
  563. * @param bld The hdfs builder object
  564. * @param uri (out param) dynamically allocated string representing the
  565. * effective URI
  566. *
  567. * @return 0 on success; error code otherwise
  568. */
  569. static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri)
  570. {
  571. const char *scheme;
  572. char suffix[64];
  573. const char *lastColon;
  574. char *u;
  575. size_t uriLen;
  576. if (!bld->nn)
  577. return EINVAL;
  578. scheme = (strstr(bld->nn, "://")) ? "" : "hdfs://";
  579. if (bld->port == 0) {
  580. suffix[0] = '\0';
  581. } else {
  582. lastColon = strrchr(bld->nn, ':');
  583. if (lastColon && (strspn(lastColon + 1, "0123456789") ==
  584. strlen(lastColon + 1))) {
  585. fprintf(stderr, "port %d was given, but URI '%s' already "
  586. "contains a port!\n", bld->port, bld->nn);
  587. return EINVAL;
  588. }
  589. snprintf(suffix, sizeof(suffix), ":%d", bld->port);
  590. }
  591. uriLen = strlen(scheme) + strlen(bld->nn) + strlen(suffix);
  592. u = malloc((uriLen + 1) * (sizeof(char)));
  593. if (!u) {
  594. fprintf(stderr, "calcEffectiveURI: out of memory");
  595. return ENOMEM;
  596. }
  597. snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn, suffix);
  598. *uri = u;
  599. return 0;
  600. }
  601. static const char *maybeNull(const char *str)
  602. {
  603. return str ? str : "(NULL)";
  604. }
  605. static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
  606. char *buf, size_t bufLen)
  607. {
  608. snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, "
  609. "kerbTicketCachePath=%s, userName=%s",
  610. bld->forceNewInstance, maybeNull(bld->nn), bld->port,
  611. maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
  612. return buf;
  613. }
  614. hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
  615. {
  616. JNIEnv *env = 0;
  617. jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
  618. jstring jURIString = NULL, jUserString = NULL;
  619. jvalue jVal;
  620. jthrowable jthr = NULL;
  621. char *cURI = 0, buf[512];
  622. int ret;
  623. jobject jRet = NULL;
  624. struct hdfsBuilderConfOpt *opt;
  625. //Get the JNIEnv* corresponding to current thread
  626. env = getJNIEnv();
  627. if (env == NULL) {
  628. ret = EINTERNAL;
  629. goto done;
  630. }
  631. // jConfiguration = new Configuration();
  632. jthr = constructNewObjectOfCachedClass(env, &jConfiguration,
  633. JC_CONFIGURATION, "()V");
  634. if (jthr) {
  635. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  636. "hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
  637. goto done;
  638. }
  639. // set configuration values
  640. for (opt = bld->opts; opt; opt = opt->next) {
  641. jthr = hadoopConfSetStr(env, jConfiguration, opt->key, opt->val);
  642. if (jthr) {
  643. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  644. "hdfsBuilderConnect(%s): error setting conf '%s' to '%s'",
  645. hdfsBuilderToStr(bld, buf, sizeof(buf)), opt->key, opt->val);
  646. goto done;
  647. }
  648. }
  649. //Check what type of FileSystem the caller wants...
  650. if (bld->nn == NULL) {
  651. // Get a local filesystem.
  652. if (bld->forceNewInstance) {
  653. // fs = FileSytem#newInstanceLocal(conf);
  654. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  655. JC_FILE_SYSTEM, "newInstanceLocal",
  656. JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_LOCALFS)),
  657. jConfiguration);
  658. if (jthr) {
  659. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  660. "hdfsBuilderConnect(%s)",
  661. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  662. goto done;
  663. }
  664. jFS = jVal.l;
  665. } else {
  666. // fs = FileSytem#getLocal(conf);
  667. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  668. JC_FILE_SYSTEM, "getLocal",
  669. JMETHOD1(JPARAM(HADOOP_CONF), JPARAM(HADOOP_LOCALFS)),
  670. jConfiguration);
  671. if (jthr) {
  672. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  673. "hdfsBuilderConnect(%s)",
  674. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  675. goto done;
  676. }
  677. jFS = jVal.l;
  678. }
  679. } else {
  680. if (!strcmp(bld->nn, "default")) {
  681. // jURI = FileSystem.getDefaultUri(conf)
  682. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  683. JC_FILE_SYSTEM, "getDefaultUri",
  684. "(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
  685. jConfiguration);
  686. if (jthr) {
  687. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  688. "hdfsBuilderConnect(%s)",
  689. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  690. goto done;
  691. }
  692. jURI = jVal.l;
  693. } else {
  694. // fs = FileSystem#get(URI, conf, ugi);
  695. ret = calcEffectiveURI(bld, &cURI);
  696. if (ret)
  697. goto done;
  698. jthr = newJavaStr(env, cURI, &jURIString);
  699. if (jthr) {
  700. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  701. "hdfsBuilderConnect(%s)",
  702. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  703. goto done;
  704. }
  705. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  706. JC_URI, "create",
  707. "(Ljava/lang/String;)Ljava/net/URI;", jURIString);
  708. if (jthr) {
  709. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  710. "hdfsBuilderConnect(%s)",
  711. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  712. goto done;
  713. }
  714. jURI = jVal.l;
  715. }
  716. if (bld->kerbTicketCachePath) {
  717. jthr = hadoopConfSetStr(env, jConfiguration,
  718. KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
  719. if (jthr) {
  720. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  721. "hdfsBuilderConnect(%s)",
  722. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  723. goto done;
  724. }
  725. }
  726. jthr = newJavaStr(env, bld->userName, &jUserString);
  727. if (jthr) {
  728. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  729. "hdfsBuilderConnect(%s)",
  730. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  731. goto done;
  732. }
  733. if (bld->forceNewInstance) {
  734. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  735. JC_FILE_SYSTEM, "newInstance",
  736. JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
  737. JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI,
  738. jConfiguration, jUserString);
  739. if (jthr) {
  740. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  741. "hdfsBuilderConnect(%s)",
  742. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  743. goto done;
  744. }
  745. jFS = jVal.l;
  746. } else {
  747. jthr = invokeMethod(env, &jVal, STATIC, NULL,
  748. JC_FILE_SYSTEM, "get",
  749. JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
  750. JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI,
  751. jConfiguration, jUserString);
  752. if (jthr) {
  753. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  754. "hdfsBuilderConnect(%s)",
  755. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  756. goto done;
  757. }
  758. jFS = jVal.l;
  759. }
  760. }
  761. jRet = (*env)->NewGlobalRef(env, jFS);
  762. if (!jRet) {
  763. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  764. "hdfsBuilderConnect(%s)",
  765. hdfsBuilderToStr(bld, buf, sizeof(buf)));
  766. goto done;
  767. }
  768. ret = 0;
  769. done:
  770. // Release unnecessary local references
  771. destroyLocalReference(env, jConfiguration);
  772. destroyLocalReference(env, jFS);
  773. destroyLocalReference(env, jURI);
  774. destroyLocalReference(env, jCachePath);
  775. destroyLocalReference(env, jURIString);
  776. destroyLocalReference(env, jUserString);
  777. free(cURI);
  778. hdfsFreeBuilder(bld);
  779. if (ret) {
  780. errno = ret;
  781. return NULL;
  782. }
  783. return (hdfsFS)jRet;
  784. }
  785. int hdfsDisconnect(hdfsFS fs)
  786. {
  787. // JAVA EQUIVALENT:
  788. // fs.close()
  789. //Get the JNIEnv* corresponding to current thread
  790. JNIEnv* env = getJNIEnv();
  791. int ret;
  792. jobject jFS;
  793. jthrowable jthr;
  794. if (env == NULL) {
  795. errno = EINTERNAL;
  796. return -1;
  797. }
  798. //Parameters
  799. jFS = (jobject)fs;
  800. //Sanity check
  801. if (fs == NULL) {
  802. errno = EBADF;
  803. return -1;
  804. }
  805. jthr = invokeMethod(env, NULL, INSTANCE, jFS, JC_FILE_SYSTEM,
  806. "close", "()V");
  807. if (jthr) {
  808. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  809. "hdfsDisconnect: FileSystem#close");
  810. } else {
  811. ret = 0;
  812. }
  813. (*env)->DeleteGlobalRef(env, jFS);
  814. if (ret) {
  815. errno = ret;
  816. return -1;
  817. }
  818. return 0;
  819. }
  820. /**
  821. * Get the default block size of a FileSystem object.
  822. *
  823. * @param env The Java env
  824. * @param jFS The FileSystem object
  825. * @param jPath The path to find the default blocksize at
  826. * @param out (out param) the default block size
  827. *
  828. * @return NULL on success; or the exception
  829. */
  830. static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS,
  831. jobject jPath, jlong *out)
  832. {
  833. jthrowable jthr;
  834. jvalue jVal;
  835. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  836. "getDefaultBlockSize", JMETHOD1(JPARAM(HADOOP_PATH),
  837. "J"), jPath);
  838. if (jthr)
  839. return jthr;
  840. *out = jVal.j;
  841. return NULL;
  842. }
  843. hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags,
  844. int bufferSize, short replication, tSize blockSize)
  845. {
  846. struct hdfsStreamBuilder *bld = hdfsStreamBuilderAlloc(fs, path, flags);
  847. if (bufferSize != 0) {
  848. hdfsStreamBuilderSetBufferSize(bld, bufferSize);
  849. }
  850. if (replication != 0) {
  851. hdfsStreamBuilderSetReplication(bld, replication);
  852. }
  853. if (blockSize != 0) {
  854. hdfsStreamBuilderSetDefaultBlockSize(bld, blockSize);
  855. }
  856. return hdfsStreamBuilderBuild(bld);
  857. }
  858. struct hdfsStreamBuilder {
  859. hdfsFS fs;
  860. int flags;
  861. int32_t bufferSize;
  862. int16_t replication;
  863. int64_t defaultBlockSize;
  864. char path[1];
  865. };
  866. struct hdfsStreamBuilder *hdfsStreamBuilderAlloc(hdfsFS fs,
  867. const char *path, int flags)
  868. {
  869. size_t path_len = strlen(path);
  870. struct hdfsStreamBuilder *bld;
  871. // Check for overflow in path_len
  872. if (path_len > SIZE_MAX - sizeof(struct hdfsStreamBuilder)) {
  873. errno = EOVERFLOW;
  874. return NULL;
  875. }
  876. // sizeof(hdfsStreamBuilder->path) includes one byte for the string
  877. // terminator
  878. bld = malloc(sizeof(struct hdfsStreamBuilder) + path_len);
  879. if (!bld) {
  880. errno = ENOMEM;
  881. return NULL;
  882. }
  883. bld->fs = fs;
  884. bld->flags = flags;
  885. bld->bufferSize = 0;
  886. bld->replication = 0;
  887. bld->defaultBlockSize = 0;
  888. memcpy(bld->path, path, path_len);
  889. bld->path[path_len] = '\0';
  890. return bld;
  891. }
  892. void hdfsStreamBuilderFree(struct hdfsStreamBuilder *bld)
  893. {
  894. free(bld);
  895. }
  896. int hdfsStreamBuilderSetBufferSize(struct hdfsStreamBuilder *bld,
  897. int32_t bufferSize)
  898. {
  899. if ((bld->flags & O_ACCMODE) != O_WRONLY) {
  900. errno = EINVAL;
  901. return -1;
  902. }
  903. bld->bufferSize = bufferSize;
  904. return 0;
  905. }
  906. int hdfsStreamBuilderSetReplication(struct hdfsStreamBuilder *bld,
  907. int16_t replication)
  908. {
  909. if ((bld->flags & O_ACCMODE) != O_WRONLY) {
  910. errno = EINVAL;
  911. return -1;
  912. }
  913. bld->replication = replication;
  914. return 0;
  915. }
  916. int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld,
  917. int64_t defaultBlockSize)
  918. {
  919. if ((bld->flags & O_ACCMODE) != O_WRONLY) {
  920. errno = EINVAL;
  921. return -1;
  922. }
  923. bld->defaultBlockSize = defaultBlockSize;
  924. return 0;
  925. }
  926. /**
  927. * Delegates to FsDataInputStream#hasCapability(String). Used to check if a
  928. * given input stream supports certain methods, such as
  929. * ByteBufferReadable#read(ByteBuffer).
  930. *
  931. * @param jFile the FsDataInputStream to call hasCapability on
  932. * @param capability the name of the capability to query; for a full list of
  933. * possible values see StreamCapabilities
  934. *
  935. * @return true if the given jFile has the given capability, false otherwise
  936. *
  937. * @see org.apache.hadoop.fs.StreamCapabilities
  938. */
  939. static int hdfsHasStreamCapability(jobject jFile,
  940. const char *capability) {
  941. int ret = 0;
  942. jthrowable jthr = NULL;
  943. jvalue jVal;
  944. jstring jCapabilityString = NULL;
  945. /* Get the JNIEnv* corresponding to current thread */
  946. JNIEnv* env = getJNIEnv();
  947. if (env == NULL) {
  948. errno = EINTERNAL;
  949. return 0;
  950. }
  951. jthr = newJavaStr(env, capability, &jCapabilityString);
  952. if (jthr) {
  953. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  954. "hdfsHasStreamCapability(%s): newJavaStr", capability);
  955. goto done;
  956. }
  957. jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
  958. JC_FS_DATA_INPUT_STREAM, "hasCapability", "(Ljava/lang/String;)Z",
  959. jCapabilityString);
  960. if (jthr) {
  961. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  962. "hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability",
  963. capability);
  964. goto done;
  965. }
  966. done:
  967. destroyLocalReference(env, jthr);
  968. destroyLocalReference(env, jCapabilityString);
  969. if (ret) {
  970. errno = ret;
  971. return 0;
  972. }
  973. if (jVal.z == JNI_TRUE) {
  974. return 1;
  975. }
  976. return 0;
  977. }
  978. /**
  979. * Sets the flags of the given hdfsFile based on the capabilities of the
  980. * underlying stream.
  981. *
  982. * @param file file->flags will be updated based on the capabilities of jFile
  983. * @param jFile the underlying stream to check for capabilities
  984. */
  985. static void setFileFlagCapabilities(hdfsFile file, jobject jFile) {
  986. // Check the StreamCapabilities of jFile to see if we can do direct
  987. // reads
  988. if (hdfsHasStreamCapability(jFile, IS_READ_BYTE_BUFFER_CAPABILITY)) {
  989. file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
  990. }
  991. // Check the StreamCapabilities of jFile to see if we can do direct
  992. // preads
  993. if (hdfsHasStreamCapability(jFile, IS_PREAD_BYTE_BUFFER_CAPABILITY)) {
  994. file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
  995. }
  996. }
  997. static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
  998. int32_t bufferSize, int16_t replication, int64_t blockSize)
  999. {
  1000. /*
  1001. JAVA EQUIVALENT:
  1002. File f = new File(path);
  1003. FSData{Input|Output}Stream f{is|os} = fs.create(f);
  1004. return f{is|os};
  1005. */
  1006. int accmode = flags & O_ACCMODE;
  1007. jstring jStrBufferSize = NULL, jStrReplication = NULL;
  1008. jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
  1009. jobject jFS = (jobject)fs;
  1010. jthrowable jthr;
  1011. jvalue jVal;
  1012. hdfsFile file = NULL;
  1013. int ret;
  1014. jint jBufferSize = bufferSize;
  1015. jshort jReplication = replication;
  1016. /* The hadoop java api/signature */
  1017. const char *method = NULL;
  1018. const char *signature = NULL;
  1019. /* Get the JNIEnv* corresponding to current thread */
  1020. JNIEnv* env = getJNIEnv();
  1021. if (env == NULL) {
  1022. errno = EINTERNAL;
  1023. return NULL;
  1024. }
  1025. if (accmode == O_RDONLY || accmode == O_WRONLY) {
  1026. /* yay */
  1027. } else if (accmode == O_RDWR) {
  1028. fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
  1029. errno = ENOTSUP;
  1030. return NULL;
  1031. } else {
  1032. fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n",
  1033. accmode);
  1034. errno = EINVAL;
  1035. return NULL;
  1036. }
  1037. if ((flags & O_CREAT) && (flags & O_EXCL)) {
  1038. fprintf(stderr,
  1039. "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
  1040. }
  1041. if (accmode == O_RDONLY) {
  1042. method = "open";
  1043. signature = JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_FSDISTRM));
  1044. } else if (flags & O_APPEND) {
  1045. method = "append";
  1046. signature = JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSDOSTRM));
  1047. } else {
  1048. method = "create";
  1049. signature = JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_FSDOSTRM));
  1050. }
  1051. /* Create an object of org.apache.hadoop.fs.Path */
  1052. jthr = constructNewObjectOfPath(env, path, &jPath);
  1053. if (jthr) {
  1054. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1055. "hdfsOpenFile(%s): constructNewObjectOfPath", path);
  1056. goto done;
  1057. }
  1058. /* Get the Configuration object from the FileSystem object */
  1059. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  1060. "getConf", JMETHOD1("", JPARAM(HADOOP_CONF)));
  1061. if (jthr) {
  1062. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1063. "hdfsOpenFile(%s): FileSystem#getConf", path);
  1064. goto done;
  1065. }
  1066. jConfiguration = jVal.l;
  1067. jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size");
  1068. if (!jStrBufferSize) {
  1069. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
  1070. goto done;
  1071. }
  1072. jStrReplication = (*env)->NewStringUTF(env, "dfs.replication");
  1073. if (!jStrReplication) {
  1074. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
  1075. goto done;
  1076. }
  1077. if (!bufferSize) {
  1078. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  1079. JC_CONFIGURATION, "getInt",
  1080. "(Ljava/lang/String;I)I", jStrBufferSize, 4096);
  1081. if (jthr) {
  1082. ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND |
  1083. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_UNRESOLVED_LINK,
  1084. "hdfsOpenFile(%s): Configuration#getInt(io.file.buffer.size)",
  1085. path);
  1086. goto done;
  1087. }
  1088. jBufferSize = jVal.i;
  1089. }
  1090. if ((accmode == O_WRONLY) && (flags & O_APPEND) == 0) {
  1091. if (!replication) {
  1092. jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
  1093. JC_CONFIGURATION, "getInt",
  1094. "(Ljava/lang/String;I)I", jStrReplication, 1);
  1095. if (jthr) {
  1096. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1097. "hdfsOpenFile(%s): Configuration#getInt(dfs.replication)",
  1098. path);
  1099. goto done;
  1100. }
  1101. jReplication = (jshort)jVal.i;
  1102. }
  1103. }
  1104. /* Create and return either the FSDataInputStream or
  1105. FSDataOutputStream references jobject jStream */
  1106. // READ?
  1107. if (accmode == O_RDONLY) {
  1108. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  1109. method, signature, jPath, jBufferSize);
  1110. } else if ((accmode == O_WRONLY) && (flags & O_APPEND)) {
  1111. // WRITE/APPEND?
  1112. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  1113. method, signature, jPath);
  1114. } else {
  1115. // WRITE/CREATE
  1116. jboolean jOverWrite = 1;
  1117. jlong jBlockSize = blockSize;
  1118. if (jBlockSize == 0) {
  1119. jthr = getDefaultBlockSize(env, jFS, jPath, &jBlockSize);
  1120. if (jthr) {
  1121. ret = EIO;
  1122. goto done;
  1123. }
  1124. }
  1125. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  1126. method, signature, jPath, jOverWrite, jBufferSize,
  1127. jReplication, jBlockSize);
  1128. }
  1129. if (jthr) {
  1130. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1131. "hdfsOpenFile(%s): FileSystem#%s(%s)", path, method, signature);
  1132. goto done;
  1133. }
  1134. jFile = jVal.l;
  1135. file = calloc(1, sizeof(struct hdfsFile_internal));
  1136. if (!file) {
  1137. fprintf(stderr, "hdfsOpenFile(%s): OOM create hdfsFile\n", path);
  1138. ret = ENOMEM;
  1139. goto done;
  1140. }
  1141. file->file = (*env)->NewGlobalRef(env, jFile);
  1142. if (!file->file) {
  1143. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1144. "hdfsOpenFile(%s): NewGlobalRef", path);
  1145. goto done;
  1146. }
  1147. file->type = (((flags & O_WRONLY) == 0) ? HDFS_STREAM_INPUT :
  1148. HDFS_STREAM_OUTPUT);
  1149. file->flags = 0;
  1150. if ((flags & O_WRONLY) == 0) {
  1151. setFileFlagCapabilities(file, jFile);
  1152. }
  1153. ret = 0;
  1154. done:
  1155. destroyLocalReference(env, jStrBufferSize);
  1156. destroyLocalReference(env, jStrReplication);
  1157. destroyLocalReference(env, jConfiguration);
  1158. destroyLocalReference(env, jPath);
  1159. destroyLocalReference(env, jFile);
  1160. if (ret) {
  1161. if (file) {
  1162. if (file->file) {
  1163. (*env)->DeleteGlobalRef(env, file->file);
  1164. }
  1165. free(file);
  1166. }
  1167. errno = ret;
  1168. return NULL;
  1169. }
  1170. return file;
  1171. }
  1172. hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder *bld)
  1173. {
  1174. hdfsFile file = hdfsOpenFileImpl(bld->fs, bld->path, bld->flags,
  1175. bld->bufferSize, bld->replication, bld->defaultBlockSize);
  1176. int prevErrno = errno;
  1177. hdfsStreamBuilderFree(bld);
  1178. errno = prevErrno;
  1179. return file;
  1180. }
  1181. /**
  1182. * A wrapper around o.a.h.fs.FutureDataInputStreamBuilder and the file name
  1183. * associated with the builder.
  1184. */
  1185. struct hdfsOpenFileBuilder {
  1186. jobject jBuilder;
  1187. const char *path;
  1188. };
  1189. /**
  1190. * A wrapper around a java.util.concurrent.Future (created by calling
  1191. * FutureDataInputStreamBuilder#build) and the file name associated with the
  1192. * builder.
  1193. */
  1194. struct hdfsOpenFileFuture {
  1195. jobject jFuture;
  1196. const char *path;
  1197. };
  1198. hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
  1199. const char *path) {
  1200. int ret = 0;
  1201. jthrowable jthr;
  1202. jvalue jVal;
  1203. jobject jFS = (jobject) fs;
  1204. jobject jPath = NULL;
  1205. jobject jBuilder = NULL;
  1206. JNIEnv *env = getJNIEnv();
  1207. if (!env) {
  1208. errno = EINTERNAL;
  1209. return NULL;
  1210. }
  1211. hdfsOpenFileBuilder *builder;
  1212. builder = calloc(1, sizeof(hdfsOpenFileBuilder));
  1213. if (!builder) {
  1214. fprintf(stderr, "hdfsOpenFileBuilderAlloc(%s): OOM when creating "
  1215. "hdfsOpenFileBuilder\n", path);
  1216. errno = ENOMEM;
  1217. goto done;
  1218. }
  1219. builder->path = path;
  1220. jthr = constructNewObjectOfPath(env, path, &jPath);
  1221. if (jthr) {
  1222. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1223. "hdfsOpenFileBuilderAlloc(%s): constructNewObjectOfPath",
  1224. path);
  1225. goto done;
  1226. }
  1227. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  1228. "openFile", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FDISB)),
  1229. jPath);
  1230. if (jthr) {
  1231. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1232. "hdfsOpenFileBuilderAlloc(%s): %s#openFile(Path) failed",
  1233. HADOOP_FS, path);
  1234. goto done;
  1235. }
  1236. jBuilder = jVal.l;
  1237. builder->jBuilder = (*env)->NewGlobalRef(env, jBuilder);
  1238. if (!builder->jBuilder) {
  1239. printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1240. "hdfsOpenFileBuilderAlloc(%s): NewGlobalRef(%s) failed", path,
  1241. HADOOP_FDISB);
  1242. ret = EINVAL;
  1243. goto done;
  1244. }
  1245. done:
  1246. destroyLocalReference(env, jPath);
  1247. destroyLocalReference(env, jBuilder);
  1248. if (ret) {
  1249. if (builder) {
  1250. if (builder->jBuilder) {
  1251. (*env)->DeleteGlobalRef(env, builder->jBuilder);
  1252. }
  1253. free(builder);
  1254. }
  1255. errno = ret;
  1256. return NULL;
  1257. }
  1258. return builder;
  1259. }
  1260. /**
  1261. * Used internally by hdfsOpenFileBuilderWithOption to switch between
  1262. * FSBuilder#must and #opt.
  1263. */
  1264. typedef enum { must, opt } openFileBuilderOptionType;
  1265. /**
  1266. * Shared implementation of hdfsOpenFileBuilderMust and hdfsOpenFileBuilderOpt
  1267. * that switches between each method depending on the value of
  1268. * openFileBuilderOptionType.
  1269. */
  1270. static hdfsOpenFileBuilder *hdfsOpenFileBuilderWithOption(
  1271. hdfsOpenFileBuilder *builder, const char *key,
  1272. const char *value, openFileBuilderOptionType optionType) {
  1273. int ret = 0;
  1274. jthrowable jthr;
  1275. jvalue jVal;
  1276. jobject localJBuilder = NULL;
  1277. jobject globalJBuilder;
  1278. jstring jKeyString = NULL;
  1279. jstring jValueString = NULL;
  1280. // If the builder was not previously created by a prior call to
  1281. // hdfsOpenFileBuilderAlloc then exit
  1282. if (builder == NULL || builder->jBuilder == NULL) {
  1283. errno = EINVAL;
  1284. return NULL;
  1285. }
  1286. JNIEnv *env = getJNIEnv();
  1287. if (!env) {
  1288. errno = EINTERNAL;
  1289. return NULL;
  1290. }
  1291. jthr = newJavaStr(env, key, &jKeyString);
  1292. if (jthr) {
  1293. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1294. "hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
  1295. builder->path, key);
  1296. goto done;
  1297. }
  1298. jthr = newJavaStr(env, value, &jValueString);
  1299. if (jthr) {
  1300. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1301. "hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
  1302. builder->path, value);
  1303. goto done;
  1304. }
  1305. const char *optionTypeMethodName;
  1306. switch (optionType) {
  1307. case must:
  1308. optionTypeMethodName = "must";
  1309. break;
  1310. case opt:
  1311. optionTypeMethodName = "opt";
  1312. break;
  1313. default:
  1314. ret = EINTERNAL;
  1315. goto done;
  1316. }
  1317. jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
  1318. JC_FUTURE_DATA_IS_BUILDER, optionTypeMethodName,
  1319. JMETHOD2(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING),
  1320. JPARAM(HADOOP_FS_BLDR)), jKeyString,
  1321. jValueString);
  1322. if (jthr) {
  1323. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1324. "hdfsOpenFileBuilderWithOption(%s): %s#%s(%s, %s) failed",
  1325. builder->path, HADOOP_FS_BLDR, optionTypeMethodName, key,
  1326. value);
  1327. goto done;
  1328. }
  1329. localJBuilder = jVal.l;
  1330. globalJBuilder = (*env)->NewGlobalRef(env, localJBuilder);
  1331. if (!globalJBuilder) {
  1332. printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1333. "hdfsOpenFileBuilderWithOption(%s): NewGlobalRef(%s) failed",
  1334. builder->path, HADOOP_FDISB);
  1335. ret = EINVAL;
  1336. goto done;
  1337. }
  1338. (*env)->DeleteGlobalRef(env, builder->jBuilder);
  1339. builder->jBuilder = globalJBuilder;
  1340. done:
  1341. destroyLocalReference(env, jKeyString);
  1342. destroyLocalReference(env, jValueString);
  1343. destroyLocalReference(env, localJBuilder);
  1344. if (ret) {
  1345. errno = ret;
  1346. return NULL;
  1347. }
  1348. return builder;
  1349. }
  1350. hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(hdfsOpenFileBuilder *builder,
  1351. const char *key, const char *value) {
  1352. openFileBuilderOptionType optionType;
  1353. optionType = must;
  1354. return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
  1355. }
  1356. hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(hdfsOpenFileBuilder *builder,
  1357. const char *key, const char *value) {
  1358. openFileBuilderOptionType optionType;
  1359. optionType = opt;
  1360. return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
  1361. }
  1362. hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(hdfsOpenFileBuilder *builder) {
  1363. int ret = 0;
  1364. jthrowable jthr;
  1365. jvalue jVal;
  1366. jobject jFuture = NULL;
  1367. // If the builder was not previously created by a prior call to
  1368. // hdfsOpenFileBuilderAlloc then exit
  1369. if (builder == NULL || builder->jBuilder == NULL) {
  1370. ret = EINVAL;
  1371. return NULL;
  1372. }
  1373. JNIEnv *env = getJNIEnv();
  1374. if (!env) {
  1375. errno = EINTERNAL;
  1376. return NULL;
  1377. }
  1378. hdfsOpenFileFuture *future;
  1379. future = calloc(1, sizeof(hdfsOpenFileFuture));
  1380. if (!future) {
  1381. fprintf(stderr, "hdfsOpenFileBuilderBuild: OOM when creating "
  1382. "hdfsOpenFileFuture\n");
  1383. errno = ENOMEM;
  1384. goto done;
  1385. }
  1386. future->path = builder->path;
  1387. jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
  1388. JC_FUTURE_DATA_IS_BUILDER, "build",
  1389. JMETHOD1("", JPARAM(JAVA_CFUTURE)));
  1390. if (jthr) {
  1391. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1392. "hdfsOpenFileBuilderBuild(%s): %s#build() failed",
  1393. builder->path, HADOOP_FDISB);
  1394. goto done;
  1395. }
  1396. jFuture = jVal.l;
  1397. future->jFuture = (*env)->NewGlobalRef(env, jFuture);
  1398. if (!future->jFuture) {
  1399. printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1400. "hdfsOpenFileBuilderBuild(%s): NewGlobalRef(%s) failed",
  1401. builder->path, JAVA_CFUTURE);
  1402. ret = EINVAL;
  1403. goto done;
  1404. }
  1405. done:
  1406. destroyLocalReference(env, jFuture);
  1407. if (ret) {
  1408. if (future) {
  1409. if (future->jFuture) {
  1410. (*env)->DeleteGlobalRef(env, future->jFuture);
  1411. }
  1412. free(future);
  1413. }
  1414. hdfsOpenFileBuilderFree(builder);
  1415. errno = ret;
  1416. return NULL;
  1417. }
  1418. hdfsOpenFileBuilderFree(builder);
  1419. return future;
  1420. }
  1421. void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder) {
  1422. JNIEnv *env;
  1423. env = getJNIEnv();
  1424. if (!env) {
  1425. return;
  1426. }
  1427. if (builder->jBuilder) {
  1428. (*env)->DeleteGlobalRef(env, builder->jBuilder);
  1429. builder->jBuilder = NULL;
  1430. }
  1431. free(builder);
  1432. }
  1433. /**
  1434. * Shared implementation of hdfsOpenFileFutureGet and
  1435. * hdfsOpenFileFutureGetWithTimeout. If a timeout is specified, calls
  1436. * Future#get() otherwise it calls Future#get(long, TimeUnit).
  1437. */
  1438. static hdfsFile fileFutureGetWithTimeout(hdfsOpenFileFuture *future,
  1439. int64_t timeout, jobject jTimeUnit) {
  1440. int ret = 0;
  1441. jthrowable jthr;
  1442. jvalue jVal;
  1443. hdfsFile file = NULL;
  1444. jobject jFile = NULL;
  1445. JNIEnv *env = getJNIEnv();
  1446. if (!env) {
  1447. ret = EINTERNAL;
  1448. return NULL;
  1449. }
  1450. if (!jTimeUnit) {
  1451. jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
  1452. JC_CFUTURE, "get", JMETHOD1("", JPARAM(JAVA_OBJECT)));
  1453. } else {
  1454. jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
  1455. JC_CFUTURE, "get", JMETHOD2("J",
  1456. JPARAM(JAVA_TIMEUNIT), JPARAM(JAVA_OBJECT)), timeout,
  1457. jTimeUnit);
  1458. }
  1459. if (jthr) {
  1460. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1461. "hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
  1462. JAVA_CFUTURE);
  1463. goto done;
  1464. }
  1465. file = calloc(1, sizeof(struct hdfsFile_internal));
  1466. if (!file) {
  1467. fprintf(stderr, "hdfsOpenFileFutureGet(%s): OOM when creating "
  1468. "hdfsFile\n", future->path);
  1469. ret = ENOMEM;
  1470. goto done;
  1471. }
  1472. jFile = jVal.l;
  1473. file->file = (*env)->NewGlobalRef(env, jFile);
  1474. if (!file->file) {
  1475. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1476. "hdfsOpenFileFutureGet(%s): NewGlobalRef(jFile) failed",
  1477. future->path);
  1478. goto done;
  1479. }
  1480. file->type = HDFS_STREAM_INPUT;
  1481. file->flags = 0;
  1482. setFileFlagCapabilities(file, jFile);
  1483. done:
  1484. destroyLocalReference(env, jTimeUnit);
  1485. destroyLocalReference(env, jFile);
  1486. if (ret) {
  1487. if (file) {
  1488. if (file->file) {
  1489. (*env)->DeleteGlobalRef(env, file->file);
  1490. }
  1491. free(file);
  1492. }
  1493. errno = ret;
  1494. return NULL;
  1495. }
  1496. return file;
  1497. }
  1498. hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future) {
  1499. return fileFutureGetWithTimeout(future, -1, NULL);
  1500. }
  1501. hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
  1502. int64_t timeout, javaConcurrentTimeUnit timeUnit) {
  1503. int ret = 0;
  1504. jthrowable jthr;
  1505. jobject jTimeUnit = NULL;
  1506. JNIEnv *env = getJNIEnv();
  1507. if (!env) {
  1508. ret = EINTERNAL;
  1509. return NULL;
  1510. }
  1511. const char *timeUnitEnumName;
  1512. switch (timeUnit) {
  1513. case jNanoseconds:
  1514. timeUnitEnumName = "NANOSECONDS";
  1515. break;
  1516. case jMicroseconds:
  1517. timeUnitEnumName = "MICROSECONDS";
  1518. break;
  1519. case jMilliseconds:
  1520. timeUnitEnumName = "MILLISECONDS";
  1521. break;
  1522. case jSeconds:
  1523. timeUnitEnumName = "SECONDS";
  1524. break;
  1525. case jMinutes:
  1526. timeUnitEnumName = "MINUTES";
  1527. break;
  1528. case jHours:
  1529. timeUnitEnumName = "HOURS";
  1530. break;
  1531. case jDays:
  1532. timeUnitEnumName = "DAYS";
  1533. break;
  1534. default:
  1535. ret = EINTERNAL;
  1536. goto done;
  1537. }
  1538. jthr = fetchEnumInstance(env, JAVA_TIMEUNIT, timeUnitEnumName, &jTimeUnit);
  1539. if (jthr) {
  1540. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1541. "hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
  1542. JAVA_CFUTURE);
  1543. goto done;
  1544. }
  1545. return fileFutureGetWithTimeout(future, timeout, jTimeUnit);
  1546. done:
  1547. if (ret) {
  1548. errno = ret;
  1549. }
  1550. return NULL;
  1551. }
  1552. int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
  1553. int mayInterruptIfRunning) {
  1554. int ret = 0;
  1555. jthrowable jthr;
  1556. jvalue jVal;
  1557. jboolean jMayInterruptIfRunning;
  1558. JNIEnv *env = getJNIEnv();
  1559. if (!env) {
  1560. ret = EINTERNAL;
  1561. return -1;
  1562. }
  1563. jMayInterruptIfRunning = mayInterruptIfRunning ? JNI_TRUE : JNI_FALSE;
  1564. jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture, JC_CFUTURE,
  1565. "cancel", JMETHOD1("Z", "Z"), jMayInterruptIfRunning);
  1566. if (jthr) {
  1567. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1568. "hdfsOpenFileFutureCancel(%s): %s#cancel failed", future->path,
  1569. JAVA_CFUTURE);
  1570. goto done;
  1571. }
  1572. done:
  1573. if (ret) {
  1574. errno = ret;
  1575. return -1;
  1576. }
  1577. if (!jVal.z) {
  1578. return -1;
  1579. }
  1580. return 0;
  1581. }
  1582. void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future) {
  1583. JNIEnv *env;
  1584. env = getJNIEnv();
  1585. if (!env) {
  1586. return;
  1587. }
  1588. if (future->jFuture) {
  1589. (*env)->DeleteGlobalRef(env, future->jFuture);
  1590. future->jFuture = NULL;
  1591. }
  1592. free(future);
  1593. }
  1594. int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength)
  1595. {
  1596. jobject jFS = (jobject)fs;
  1597. jthrowable jthr;
  1598. jvalue jVal;
  1599. jobject jPath = NULL;
  1600. JNIEnv *env = getJNIEnv();
  1601. if (!env) {
  1602. errno = EINTERNAL;
  1603. return -1;
  1604. }
  1605. /* Create an object of org.apache.hadoop.fs.Path */
  1606. jthr = constructNewObjectOfPath(env, path, &jPath);
  1607. if (jthr) {
  1608. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1609. "hdfsTruncateFile(%s): constructNewObjectOfPath", path);
  1610. return -1;
  1611. }
  1612. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  1613. "truncate", JMETHOD2(JPARAM(HADOOP_PATH), "J", "Z"),
  1614. jPath, newlength);
  1615. destroyLocalReference(env, jPath);
  1616. if (jthr) {
  1617. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1618. "hdfsTruncateFile(%s): FileSystem#truncate", path);
  1619. return -1;
  1620. }
  1621. if (jVal.z == JNI_TRUE) {
  1622. return 1;
  1623. }
  1624. return 0;
  1625. }
  1626. int hdfsUnbufferFile(hdfsFile file)
  1627. {
  1628. int ret;
  1629. jthrowable jthr;
  1630. JNIEnv *env = getJNIEnv();
  1631. if (!env) {
  1632. ret = EINTERNAL;
  1633. goto done;
  1634. }
  1635. if (file->type != HDFS_STREAM_INPUT) {
  1636. ret = ENOTSUP;
  1637. goto done;
  1638. }
  1639. jthr = invokeMethod(env, NULL, INSTANCE, file->file,
  1640. JC_FS_DATA_INPUT_STREAM, "unbuffer", "()V");
  1641. if (jthr) {
  1642. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1643. HADOOP_FSDISTRM "#unbuffer failed:");
  1644. goto done;
  1645. }
  1646. ret = 0;
  1647. done:
  1648. errno = ret;
  1649. return ret;
  1650. }
  1651. int hdfsCloseFile(hdfsFS fs, hdfsFile file)
  1652. {
  1653. int ret;
  1654. // JAVA EQUIVALENT:
  1655. // file.close
  1656. //The interface whose 'close' method to be called
  1657. CachedJavaClass cachedJavaClass;
  1658. const char *interfaceShortName;
  1659. //Caught exception
  1660. jthrowable jthr;
  1661. //Get the JNIEnv* corresponding to current thread
  1662. JNIEnv* env = getJNIEnv();
  1663. if (env == NULL) {
  1664. errno = EINTERNAL;
  1665. return -1;
  1666. }
  1667. //Sanity check
  1668. if (!file || file->type == HDFS_STREAM_UNINITIALIZED) {
  1669. errno = EBADF;
  1670. return -1;
  1671. }
  1672. if (file->type == HDFS_STREAM_INPUT) {
  1673. cachedJavaClass = JC_FS_DATA_INPUT_STREAM;
  1674. } else {
  1675. cachedJavaClass = JC_FS_DATA_OUTPUT_STREAM;
  1676. }
  1677. jthr = invokeMethod(env, NULL, INSTANCE, file->file,
  1678. cachedJavaClass, "close", "()V");
  1679. if (jthr) {
  1680. interfaceShortName = (file->type == HDFS_STREAM_INPUT) ?
  1681. "FSDataInputStream" : "FSDataOutputStream";
  1682. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1683. "%s#close", interfaceShortName);
  1684. } else {
  1685. ret = 0;
  1686. }
  1687. //De-allocate memory
  1688. (*env)->DeleteGlobalRef(env, file->file);
  1689. free(file);
  1690. if (ret) {
  1691. errno = ret;
  1692. return -1;
  1693. }
  1694. return 0;
  1695. }
  1696. int hdfsExists(hdfsFS fs, const char *path)
  1697. {
  1698. JNIEnv *env = getJNIEnv();
  1699. jobject jPath;
  1700. jvalue jVal;
  1701. jobject jFS = (jobject)fs;
  1702. jthrowable jthr;
  1703. if (env == NULL) {
  1704. errno = EINTERNAL;
  1705. return -1;
  1706. }
  1707. if (path == NULL) {
  1708. errno = EINVAL;
  1709. return -1;
  1710. }
  1711. jthr = constructNewObjectOfPath(env, path, &jPath);
  1712. if (jthr) {
  1713. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1714. "hdfsExists: constructNewObjectOfPath");
  1715. return -1;
  1716. }
  1717. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  1718. "exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath);
  1719. destroyLocalReference(env, jPath);
  1720. if (jthr) {
  1721. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1722. "hdfsExists: invokeMethod(%s)",
  1723. JMETHOD1(JPARAM(HADOOP_PATH), "Z"));
  1724. return -1;
  1725. }
  1726. if (jVal.z) {
  1727. return 0;
  1728. } else {
  1729. errno = ENOENT;
  1730. return -1;
  1731. }
  1732. }
  1733. // Checks input file for readiness for reading.
  1734. static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
  1735. jobject* jInputStream)
  1736. {
  1737. *jInputStream = (jobject)(f ? f->file : NULL);
  1738. //Sanity check
  1739. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  1740. errno = EBADF;
  1741. return -1;
  1742. }
  1743. //Error checking... make sure that this file is 'readable'
  1744. if (f->type != HDFS_STREAM_INPUT) {
  1745. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  1746. errno = EINVAL;
  1747. return -1;
  1748. }
  1749. return 0;
  1750. }
  1751. /**
  1752. * If the underlying stream supports the ByteBufferReadable interface then
  1753. * this method will transparently use read(ByteBuffer). This can help
  1754. * improve performance as it avoids unnecessarily copying data on to the Java
  1755. * heap. Instead the data will be directly copied from kernel space to the C
  1756. * heap.
  1757. */
  1758. tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  1759. {
  1760. jobject jInputStream;
  1761. jbyteArray jbRarray;
  1762. jvalue jVal;
  1763. jthrowable jthr;
  1764. JNIEnv* env;
  1765. if (length == 0) {
  1766. return 0;
  1767. } else if (length < 0) {
  1768. errno = EINVAL;
  1769. return -1;
  1770. }
  1771. if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
  1772. return readDirect(fs, f, buffer, length);
  1773. }
  1774. // JAVA EQUIVALENT:
  1775. // byte [] bR = new byte[length];
  1776. // fis.read(bR);
  1777. //Get the JNIEnv* corresponding to current thread
  1778. env = getJNIEnv();
  1779. if (env == NULL) {
  1780. errno = EINTERNAL;
  1781. return -1;
  1782. }
  1783. //Parameters
  1784. if (readPrepare(env, fs, f, &jInputStream) == -1) {
  1785. return -1;
  1786. }
  1787. //Read the requisite bytes
  1788. jbRarray = (*env)->NewByteArray(env, length);
  1789. if (!jbRarray) {
  1790. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1791. "hdfsRead: NewByteArray");
  1792. return -1;
  1793. }
  1794. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
  1795. JC_FS_DATA_INPUT_STREAM, "read", "([B)I", jbRarray);
  1796. if (jthr) {
  1797. destroyLocalReference(env, jbRarray);
  1798. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1799. "hdfsRead: FSDataInputStream#read");
  1800. return -1;
  1801. }
  1802. if (jVal.i < 0) {
  1803. // EOF
  1804. destroyLocalReference(env, jbRarray);
  1805. return 0;
  1806. } else if (jVal.i == 0) {
  1807. destroyLocalReference(env, jbRarray);
  1808. errno = EINTR;
  1809. return -1;
  1810. }
  1811. // We only copy the portion of the jbRarray that was actually filled by
  1812. // the call to FsDataInputStream#read; #read is not guaranteed to fill the
  1813. // entire buffer, instead it returns the number of bytes read into the
  1814. // buffer; we use the return value as the input in GetByteArrayRegion to
  1815. // ensure don't copy more bytes than necessary
  1816. (*env)->GetByteArrayRegion(env, jbRarray, 0, jVal.i, buffer);
  1817. destroyLocalReference(env, jbRarray);
  1818. if ((*env)->ExceptionCheck(env)) {
  1819. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1820. "hdfsRead: GetByteArrayRegion");
  1821. return -1;
  1822. }
  1823. return jVal.i;
  1824. }
  1825. tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
  1826. {
  1827. // JAVA EQUIVALENT:
  1828. // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
  1829. // fis.read(buf);
  1830. jobject jInputStream;
  1831. jvalue jVal;
  1832. jthrowable jthr;
  1833. jobject bb;
  1834. //Get the JNIEnv* corresponding to current thread
  1835. JNIEnv* env = getJNIEnv();
  1836. if (env == NULL) {
  1837. errno = EINTERNAL;
  1838. return -1;
  1839. }
  1840. if (readPrepare(env, fs, f, &jInputStream) == -1) {
  1841. return -1;
  1842. }
  1843. //Read the requisite bytes
  1844. bb = (*env)->NewDirectByteBuffer(env, buffer, length);
  1845. if (bb == NULL) {
  1846. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1847. "readDirect: NewDirectByteBuffer");
  1848. return -1;
  1849. }
  1850. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
  1851. JC_FS_DATA_INPUT_STREAM, "read",
  1852. "(Ljava/nio/ByteBuffer;)I", bb);
  1853. destroyLocalReference(env, bb);
  1854. if (jthr) {
  1855. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1856. "readDirect: FSDataInputStream#read");
  1857. return -1;
  1858. }
  1859. // Reached EOF, return 0
  1860. if (jVal.i < 0) {
  1861. return 0;
  1862. }
  1863. // 0 bytes read, return error
  1864. if (jVal.i == 0) {
  1865. errno = EINTR;
  1866. return -1;
  1867. }
  1868. return jVal.i;
  1869. }
  1870. /**
  1871. * If the underlying stream supports the ByteBufferPositionedReadable
  1872. * interface then this method will transparently use read(long, ByteBuffer).
  1873. * This can help improve performance as it avoids unnecessarily copying data
  1874. * on to the Java heap. Instead the data will be directly copied from kernel
  1875. * space to the C heap.
  1876. */
  1877. tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
  1878. void* buffer, tSize length)
  1879. {
  1880. JNIEnv* env;
  1881. jbyteArray jbRarray;
  1882. jvalue jVal;
  1883. jthrowable jthr;
  1884. if (length == 0) {
  1885. return 0;
  1886. } else if (length < 0) {
  1887. errno = EINVAL;
  1888. return -1;
  1889. }
  1890. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  1891. errno = EBADF;
  1892. return -1;
  1893. }
  1894. if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
  1895. return preadDirect(fs, f, position, buffer, length);
  1896. }
  1897. env = getJNIEnv();
  1898. if (env == NULL) {
  1899. errno = EINTERNAL;
  1900. return -1;
  1901. }
  1902. //Error checking... make sure that this file is 'readable'
  1903. if (f->type != HDFS_STREAM_INPUT) {
  1904. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  1905. errno = EINVAL;
  1906. return -1;
  1907. }
  1908. // JAVA EQUIVALENT:
  1909. // byte [] bR = new byte[length];
  1910. // fis.read(pos, bR, 0, length);
  1911. jbRarray = (*env)->NewByteArray(env, length);
  1912. if (!jbRarray) {
  1913. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1914. "hdfsPread: NewByteArray");
  1915. return -1;
  1916. }
  1917. jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
  1918. JC_FS_DATA_INPUT_STREAM, "read", "(J[BII)I", position,
  1919. jbRarray, 0, length);
  1920. if (jthr) {
  1921. destroyLocalReference(env, jbRarray);
  1922. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1923. "hdfsPread: FSDataInputStream#read");
  1924. return -1;
  1925. }
  1926. if (jVal.i < 0) {
  1927. // EOF
  1928. destroyLocalReference(env, jbRarray);
  1929. return 0;
  1930. } else if (jVal.i == 0) {
  1931. destroyLocalReference(env, jbRarray);
  1932. errno = EINTR;
  1933. return -1;
  1934. }
  1935. (*env)->GetByteArrayRegion(env, jbRarray, 0, jVal.i, buffer);
  1936. destroyLocalReference(env, jbRarray);
  1937. if ((*env)->ExceptionCheck(env)) {
  1938. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1939. "hdfsPread: GetByteArrayRegion");
  1940. return -1;
  1941. }
  1942. return jVal.i;
  1943. }
  1944. tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
  1945. tSize length)
  1946. {
  1947. // JAVA EQUIVALENT:
  1948. // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
  1949. // fis.read(position, buf);
  1950. jvalue jVal;
  1951. jthrowable jthr;
  1952. jobject bb;
  1953. //Get the JNIEnv* corresponding to current thread
  1954. JNIEnv* env = getJNIEnv();
  1955. if (env == NULL) {
  1956. errno = EINTERNAL;
  1957. return -1;
  1958. }
  1959. //Error checking... make sure that this file is 'readable'
  1960. if (f->type != HDFS_STREAM_INPUT) {
  1961. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  1962. errno = EINVAL;
  1963. return -1;
  1964. }
  1965. //Read the requisite bytes
  1966. bb = (*env)->NewDirectByteBuffer(env, buffer, length);
  1967. if (bb == NULL) {
  1968. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  1969. "readDirect: NewDirectByteBuffer");
  1970. return -1;
  1971. }
  1972. jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
  1973. JC_FS_DATA_INPUT_STREAM, "read", "(JLjava/nio/ByteBuffer;)I",
  1974. position, bb);
  1975. destroyLocalReference(env, bb);
  1976. if (jthr) {
  1977. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  1978. "preadDirect: FSDataInputStream#read");
  1979. return -1;
  1980. }
  1981. // Reached EOF, return 0
  1982. if (jVal.i < 0) {
  1983. return 0;
  1984. }
  1985. // 0 bytes read, return error
  1986. if (jVal.i == 0) {
  1987. errno = EINTR;
  1988. return -1;
  1989. }
  1990. return jVal.i;
  1991. }
  1992. /**
  1993. * Like hdfsPread, if the underlying stream supports the
  1994. * ByteBufferPositionedReadable interface then this method will transparently
  1995. * use readFully(long, ByteBuffer).
  1996. */
  1997. int hdfsPreadFully(hdfsFS fs, hdfsFile f, tOffset position,
  1998. void* buffer, tSize length) {
  1999. JNIEnv* env;
  2000. jbyteArray jbRarray;
  2001. jthrowable jthr;
  2002. if (length == 0) {
  2003. return 0;
  2004. } else if (length < 0) {
  2005. errno = EINVAL;
  2006. return -1;
  2007. }
  2008. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  2009. errno = EBADF;
  2010. return -1;
  2011. }
  2012. if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
  2013. return preadFullyDirect(fs, f, position, buffer, length);
  2014. }
  2015. env = getJNIEnv();
  2016. if (env == NULL) {
  2017. errno = EINTERNAL;
  2018. return -1;
  2019. }
  2020. //Error checking... make sure that this file is 'readable'
  2021. if (f->type != HDFS_STREAM_INPUT) {
  2022. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  2023. errno = EINVAL;
  2024. return -1;
  2025. }
  2026. // JAVA EQUIVALENT:
  2027. // byte [] bR = new byte[length];
  2028. // fis.read(pos, bR, 0, length);
  2029. jbRarray = (*env)->NewByteArray(env, length);
  2030. if (!jbRarray) {
  2031. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2032. "hdfsPread: NewByteArray");
  2033. return -1;
  2034. }
  2035. jthr = invokeMethod(env, NULL, INSTANCE, f->file,
  2036. JC_FS_DATA_INPUT_STREAM, "readFully", "(J[BII)V",
  2037. position, jbRarray, 0, length);
  2038. if (jthr) {
  2039. destroyLocalReference(env, jbRarray);
  2040. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2041. "hdfsPread: FSDataInputStream#read");
  2042. return -1;
  2043. }
  2044. (*env)->GetByteArrayRegion(env, jbRarray, 0, length, buffer);
  2045. destroyLocalReference(env, jbRarray);
  2046. if ((*env)->ExceptionCheck(env)) {
  2047. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2048. "hdfsPread: GetByteArrayRegion");
  2049. return -1;
  2050. }
  2051. return 0;
  2052. }
  2053. int preadFullyDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
  2054. tSize length)
  2055. {
  2056. // JAVA EQUIVALENT:
  2057. // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
  2058. // fis.read(position, buf);
  2059. jthrowable jthr;
  2060. jobject bb;
  2061. //Get the JNIEnv* corresponding to current thread
  2062. JNIEnv* env = getJNIEnv();
  2063. if (env == NULL) {
  2064. errno = EINTERNAL;
  2065. return -1;
  2066. }
  2067. //Error checking... make sure that this file is 'readable'
  2068. if (f->type != HDFS_STREAM_INPUT) {
  2069. fprintf(stderr, "Cannot read from a non-InputStream object!\n");
  2070. errno = EINVAL;
  2071. return -1;
  2072. }
  2073. //Read the requisite bytes
  2074. bb = (*env)->NewDirectByteBuffer(env, buffer, length);
  2075. if (bb == NULL) {
  2076. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2077. "readDirect: NewDirectByteBuffer");
  2078. return -1;
  2079. }
  2080. jthr = invokeMethod(env, NULL, INSTANCE, f->file,
  2081. JC_FS_DATA_INPUT_STREAM, "readFully",
  2082. "(JLjava/nio/ByteBuffer;)V", position, bb);
  2083. destroyLocalReference(env, bb);
  2084. if (jthr) {
  2085. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2086. "preadDirect: FSDataInputStream#read");
  2087. return -1;
  2088. }
  2089. return 0;
  2090. }
  2091. tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
  2092. {
  2093. // JAVA EQUIVALENT
  2094. // byte b[] = str.getBytes();
  2095. // fso.write(b);
  2096. jobject jOutputStream;
  2097. jbyteArray jbWarray;
  2098. jthrowable jthr;
  2099. //Get the JNIEnv* corresponding to current thread
  2100. JNIEnv* env = getJNIEnv();
  2101. if (env == NULL) {
  2102. errno = EINTERNAL;
  2103. return -1;
  2104. }
  2105. //Sanity check
  2106. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  2107. errno = EBADF;
  2108. return -1;
  2109. }
  2110. jOutputStream = f->file;
  2111. if (length < 0) {
  2112. errno = EINVAL;
  2113. return -1;
  2114. }
  2115. //Error checking... make sure that this file is 'writable'
  2116. if (f->type != HDFS_STREAM_OUTPUT) {
  2117. fprintf(stderr, "Cannot write into a non-OutputStream object!\n");
  2118. errno = EINVAL;
  2119. return -1;
  2120. }
  2121. if (length < 0) {
  2122. errno = EINVAL;
  2123. return -1;
  2124. }
  2125. if (length == 0) {
  2126. return 0;
  2127. }
  2128. //Write the requisite bytes into the file
  2129. jbWarray = (*env)->NewByteArray(env, length);
  2130. if (!jbWarray) {
  2131. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2132. "hdfsWrite: NewByteArray");
  2133. return -1;
  2134. }
  2135. (*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer);
  2136. if ((*env)->ExceptionCheck(env)) {
  2137. destroyLocalReference(env, jbWarray);
  2138. errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2139. "hdfsWrite(length = %d): SetByteArrayRegion", length);
  2140. return -1;
  2141. }
  2142. jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  2143. JC_FS_DATA_OUTPUT_STREAM, "write", "([B)V",
  2144. jbWarray);
  2145. destroyLocalReference(env, jbWarray);
  2146. if (jthr) {
  2147. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2148. "hdfsWrite: FSDataOutputStream#write");
  2149. return -1;
  2150. }
  2151. // Unlike most Java streams, FSDataOutputStream never does partial writes.
  2152. // If we succeeded, all the data was written.
  2153. return length;
  2154. }
  2155. int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos)
  2156. {
  2157. // JAVA EQUIVALENT
  2158. // fis.seek(pos);
  2159. jobject jInputStream;
  2160. jthrowable jthr;
  2161. //Get the JNIEnv* corresponding to current thread
  2162. JNIEnv* env = getJNIEnv();
  2163. if (env == NULL) {
  2164. errno = EINTERNAL;
  2165. return -1;
  2166. }
  2167. //Sanity check
  2168. if (!f || f->type != HDFS_STREAM_INPUT) {
  2169. errno = EBADF;
  2170. return -1;
  2171. }
  2172. jInputStream = f->file;
  2173. jthr = invokeMethod(env, NULL, INSTANCE, jInputStream,
  2174. JC_FS_DATA_INPUT_STREAM, "seek", "(J)V", desiredPos);
  2175. if (jthr) {
  2176. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2177. "hdfsSeek(desiredPos=%" PRId64 ")"
  2178. ": FSDataInputStream#seek", desiredPos);
  2179. return -1;
  2180. }
  2181. return 0;
  2182. }
  2183. tOffset hdfsTell(hdfsFS fs, hdfsFile f)
  2184. {
  2185. // JAVA EQUIVALENT
  2186. // pos = f.getPos();
  2187. jobject jStream;
  2188. CachedJavaClass cachedJavaClass;
  2189. jvalue jVal;
  2190. jthrowable jthr;
  2191. //Get the JNIEnv* corresponding to current thread
  2192. JNIEnv* env = getJNIEnv();
  2193. if (env == NULL) {
  2194. errno = EINTERNAL;
  2195. return -1;
  2196. }
  2197. //Sanity check
  2198. if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
  2199. errno = EBADF;
  2200. return -1;
  2201. }
  2202. //Parameters
  2203. jStream = f->file;
  2204. if (f->type == HDFS_STREAM_INPUT) {
  2205. cachedJavaClass = JC_FS_DATA_INPUT_STREAM;
  2206. } else {
  2207. cachedJavaClass = JC_FS_DATA_OUTPUT_STREAM;
  2208. }
  2209. jthr = invokeMethod(env, &jVal, INSTANCE, jStream,
  2210. cachedJavaClass, "getPos", "()J");
  2211. if (jthr) {
  2212. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2213. "hdfsTell: %s#getPos",
  2214. ((f->type == HDFS_STREAM_INPUT) ? "FSDataInputStream" :
  2215. "FSDataOutputStream"));
  2216. return -1;
  2217. }
  2218. return jVal.j;
  2219. }
  2220. int hdfsFlush(hdfsFS fs, hdfsFile f)
  2221. {
  2222. // JAVA EQUIVALENT
  2223. // fos.flush();
  2224. jthrowable jthr;
  2225. //Get the JNIEnv* corresponding to current thread
  2226. JNIEnv* env = getJNIEnv();
  2227. if (env == NULL) {
  2228. errno = EINTERNAL;
  2229. return -1;
  2230. }
  2231. //Sanity check
  2232. if (!f || f->type != HDFS_STREAM_OUTPUT) {
  2233. errno = EBADF;
  2234. return -1;
  2235. }
  2236. jthr = invokeMethod(env, NULL, INSTANCE, f->file,
  2237. JC_FS_DATA_OUTPUT_STREAM, "flush", "()V");
  2238. if (jthr) {
  2239. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2240. "hdfsFlush: FSDataInputStream#flush");
  2241. return -1;
  2242. }
  2243. return 0;
  2244. }
  2245. int hdfsHFlush(hdfsFS fs, hdfsFile f)
  2246. {
  2247. jobject jOutputStream;
  2248. jthrowable jthr;
  2249. //Get the JNIEnv* corresponding to current thread
  2250. JNIEnv* env = getJNIEnv();
  2251. if (env == NULL) {
  2252. errno = EINTERNAL;
  2253. return -1;
  2254. }
  2255. //Sanity check
  2256. if (!f || f->type != HDFS_STREAM_OUTPUT) {
  2257. errno = EBADF;
  2258. return -1;
  2259. }
  2260. jOutputStream = f->file;
  2261. jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  2262. JC_FS_DATA_OUTPUT_STREAM, "hflush", "()V");
  2263. if (jthr) {
  2264. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2265. "hdfsHFlush: FSDataOutputStream#hflush");
  2266. return -1;
  2267. }
  2268. return 0;
  2269. }
  2270. int hdfsHSync(hdfsFS fs, hdfsFile f)
  2271. {
  2272. jobject jOutputStream;
  2273. jthrowable jthr;
  2274. //Get the JNIEnv* corresponding to current thread
  2275. JNIEnv* env = getJNIEnv();
  2276. if (env == NULL) {
  2277. errno = EINTERNAL;
  2278. return -1;
  2279. }
  2280. //Sanity check
  2281. if (!f || f->type != HDFS_STREAM_OUTPUT) {
  2282. errno = EBADF;
  2283. return -1;
  2284. }
  2285. jOutputStream = f->file;
  2286. jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
  2287. JC_FS_DATA_OUTPUT_STREAM, "hsync", "()V");
  2288. if (jthr) {
  2289. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2290. "hdfsHSync: FSDataOutputStream#hsync");
  2291. return -1;
  2292. }
  2293. return 0;
  2294. }
  2295. int hdfsAvailable(hdfsFS fs, hdfsFile f)
  2296. {
  2297. // JAVA EQUIVALENT
  2298. // fis.available();
  2299. jobject jInputStream;
  2300. jvalue jVal;
  2301. jthrowable jthr;
  2302. //Get the JNIEnv* corresponding to current thread
  2303. JNIEnv* env = getJNIEnv();
  2304. if (env == NULL) {
  2305. errno = EINTERNAL;
  2306. return -1;
  2307. }
  2308. //Sanity check
  2309. if (!f || f->type != HDFS_STREAM_INPUT) {
  2310. errno = EBADF;
  2311. return -1;
  2312. }
  2313. //Parameters
  2314. jInputStream = f->file;
  2315. jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
  2316. JC_FS_DATA_INPUT_STREAM, "available", "()I");
  2317. if (jthr) {
  2318. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2319. "hdfsAvailable: FSDataInputStream#available");
  2320. return -1;
  2321. }
  2322. return jVal.i;
  2323. }
  2324. static int hdfsCopyImpl(hdfsFS srcFS, const char *src, hdfsFS dstFS,
  2325. const char *dst, jboolean deleteSource)
  2326. {
  2327. //JAVA EQUIVALENT
  2328. // FileUtil#copy(srcFS, srcPath, dstFS, dstPath,
  2329. // deleteSource = false, conf)
  2330. //Parameters
  2331. jobject jSrcFS = (jobject)srcFS;
  2332. jobject jDstFS = (jobject)dstFS;
  2333. jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL;
  2334. jthrowable jthr;
  2335. jvalue jVal;
  2336. int ret;
  2337. //Get the JNIEnv* corresponding to current thread
  2338. JNIEnv* env = getJNIEnv();
  2339. if (env == NULL) {
  2340. errno = EINTERNAL;
  2341. return -1;
  2342. }
  2343. jthr = constructNewObjectOfPath(env, src, &jSrcPath);
  2344. if (jthr) {
  2345. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2346. "hdfsCopyImpl(src=%s): constructNewObjectOfPath", src);
  2347. goto done;
  2348. }
  2349. jthr = constructNewObjectOfPath(env, dst, &jDstPath);
  2350. if (jthr) {
  2351. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2352. "hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst);
  2353. goto done;
  2354. }
  2355. //Create the org.apache.hadoop.conf.Configuration object
  2356. jthr = constructNewObjectOfCachedClass(env, &jConfiguration,
  2357. JC_CONFIGURATION, "()V");
  2358. if (jthr) {
  2359. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2360. "hdfsCopyImpl: Configuration constructor");
  2361. goto done;
  2362. }
  2363. //FileUtil#copy
  2364. jthr = invokeMethod(env, &jVal, STATIC, NULL, JC_FILE_UTIL,
  2365. "copy",
  2366. "(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
  2367. "Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
  2368. "ZLorg/apache/hadoop/conf/Configuration;)Z",
  2369. jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource,
  2370. jConfiguration);
  2371. if (jthr) {
  2372. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2373. "hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): "
  2374. "FileUtil#copy", src, dst, deleteSource);
  2375. goto done;
  2376. }
  2377. if (!jVal.z) {
  2378. ret = EIO;
  2379. goto done;
  2380. }
  2381. ret = 0;
  2382. done:
  2383. destroyLocalReference(env, jConfiguration);
  2384. destroyLocalReference(env, jSrcPath);
  2385. destroyLocalReference(env, jDstPath);
  2386. if (ret) {
  2387. errno = ret;
  2388. return -1;
  2389. }
  2390. return 0;
  2391. }
  2392. int hdfsCopy(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst)
  2393. {
  2394. return hdfsCopyImpl(srcFS, src, dstFS, dst, 0);
  2395. }
  2396. int hdfsMove(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst)
  2397. {
  2398. return hdfsCopyImpl(srcFS, src, dstFS, dst, 1);
  2399. }
  2400. int hdfsDelete(hdfsFS fs, const char *path, int recursive)
  2401. {
  2402. // JAVA EQUIVALENT:
  2403. // Path p = new Path(path);
  2404. // bool retval = fs.delete(p, recursive);
  2405. jobject jFS = (jobject)fs;
  2406. jthrowable jthr;
  2407. jobject jPath;
  2408. jvalue jVal;
  2409. jboolean jRecursive;
  2410. //Get the JNIEnv* corresponding to current thread
  2411. JNIEnv* env = getJNIEnv();
  2412. if (env == NULL) {
  2413. errno = EINTERNAL;
  2414. return -1;
  2415. }
  2416. jthr = constructNewObjectOfPath(env, path, &jPath);
  2417. if (jthr) {
  2418. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2419. "hdfsDelete(path=%s): constructNewObjectOfPath", path);
  2420. return -1;
  2421. }
  2422. jRecursive = recursive ? JNI_TRUE : JNI_FALSE;
  2423. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  2424. "delete", "(Lorg/apache/hadoop/fs/Path;Z)Z", jPath,
  2425. jRecursive);
  2426. destroyLocalReference(env, jPath);
  2427. if (jthr) {
  2428. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2429. "hdfsDelete(path=%s, recursive=%d): "
  2430. "FileSystem#delete", path, recursive);
  2431. return -1;
  2432. }
  2433. if (!jVal.z) {
  2434. errno = EIO;
  2435. return -1;
  2436. }
  2437. return 0;
  2438. }
  2439. int hdfsRename(hdfsFS fs, const char *oldPath, const char *newPath)
  2440. {
  2441. // JAVA EQUIVALENT:
  2442. // Path old = new Path(oldPath);
  2443. // Path new = new Path(newPath);
  2444. // fs.rename(old, new);
  2445. jobject jFS = (jobject)fs;
  2446. jthrowable jthr;
  2447. jobject jOldPath = NULL, jNewPath = NULL;
  2448. int ret = -1;
  2449. jvalue jVal;
  2450. //Get the JNIEnv* corresponding to current thread
  2451. JNIEnv* env = getJNIEnv();
  2452. if (env == NULL) {
  2453. errno = EINTERNAL;
  2454. return -1;
  2455. }
  2456. jthr = constructNewObjectOfPath(env, oldPath, &jOldPath );
  2457. if (jthr) {
  2458. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2459. "hdfsRename: constructNewObjectOfPath(%s)", oldPath);
  2460. goto done;
  2461. }
  2462. jthr = constructNewObjectOfPath(env, newPath, &jNewPath);
  2463. if (jthr) {
  2464. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2465. "hdfsRename: constructNewObjectOfPath(%s)", newPath);
  2466. goto done;
  2467. }
  2468. // Rename the file
  2469. // TODO: use rename2 here? (See HDFS-3592)
  2470. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  2471. "rename", JMETHOD2(JPARAM(HADOOP_PATH), JPARAM
  2472. (HADOOP_PATH), "Z"), jOldPath, jNewPath);
  2473. if (jthr) {
  2474. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2475. "hdfsRename(oldPath=%s, newPath=%s): FileSystem#rename",
  2476. oldPath, newPath);
  2477. goto done;
  2478. }
  2479. if (!jVal.z) {
  2480. errno = EIO;
  2481. goto done;
  2482. }
  2483. ret = 0;
  2484. done:
  2485. destroyLocalReference(env, jOldPath);
  2486. destroyLocalReference(env, jNewPath);
  2487. return ret;
  2488. }
  2489. char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize)
  2490. {
  2491. // JAVA EQUIVALENT:
  2492. // Path p = fs.getWorkingDirectory();
  2493. // return p.toString()
  2494. jobject jPath = NULL;
  2495. jstring jPathString = NULL;
  2496. jobject jFS = (jobject)fs;
  2497. jvalue jVal;
  2498. jthrowable jthr;
  2499. int ret;
  2500. const char *jPathChars = NULL;
  2501. //Get the JNIEnv* corresponding to current thread
  2502. JNIEnv* env = getJNIEnv();
  2503. if (env == NULL) {
  2504. errno = EINTERNAL;
  2505. return NULL;
  2506. }
  2507. //FileSystem#getWorkingDirectory()
  2508. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  2509. "getWorkingDirectory", "()Lorg/apache/hadoop/fs/Path;");
  2510. if (jthr) {
  2511. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2512. "hdfsGetWorkingDirectory: FileSystem#getWorkingDirectory");
  2513. goto done;
  2514. }
  2515. jPath = jVal.l;
  2516. if (!jPath) {
  2517. fprintf(stderr, "hdfsGetWorkingDirectory: "
  2518. "FileSystem#getWorkingDirectory returned NULL");
  2519. ret = -EIO;
  2520. goto done;
  2521. }
  2522. //Path#toString()
  2523. jthr = invokeMethod(env, &jVal, INSTANCE, jPath, JC_PATH, "toString",
  2524. "()Ljava/lang/String;");
  2525. if (jthr) {
  2526. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2527. "hdfsGetWorkingDirectory: Path#toString");
  2528. goto done;
  2529. }
  2530. jPathString = jVal.l;
  2531. jPathChars = (*env)->GetStringUTFChars(env, jPathString, NULL);
  2532. if (!jPathChars) {
  2533. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2534. "hdfsGetWorkingDirectory: GetStringUTFChars");
  2535. goto done;
  2536. }
  2537. //Copy to user-provided buffer
  2538. ret = snprintf(buffer, bufferSize, "%s", jPathChars);
  2539. if (ret >= bufferSize) {
  2540. ret = ENAMETOOLONG;
  2541. goto done;
  2542. }
  2543. ret = 0;
  2544. done:
  2545. if (jPathChars) {
  2546. (*env)->ReleaseStringUTFChars(env, jPathString, jPathChars);
  2547. }
  2548. destroyLocalReference(env, jPath);
  2549. destroyLocalReference(env, jPathString);
  2550. if (ret) {
  2551. errno = ret;
  2552. return NULL;
  2553. }
  2554. return buffer;
  2555. }
  2556. int hdfsSetWorkingDirectory(hdfsFS fs, const char *path)
  2557. {
  2558. // JAVA EQUIVALENT:
  2559. // fs.setWorkingDirectory(Path(path));
  2560. jobject jFS = (jobject)fs;
  2561. jthrowable jthr;
  2562. jobject jPath;
  2563. //Get the JNIEnv* corresponding to current thread
  2564. JNIEnv* env = getJNIEnv();
  2565. if (env == NULL) {
  2566. errno = EINTERNAL;
  2567. return -1;
  2568. }
  2569. //Create an object of org.apache.hadoop.fs.Path
  2570. jthr = constructNewObjectOfPath(env, path, &jPath);
  2571. if (jthr) {
  2572. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2573. "hdfsSetWorkingDirectory(%s): constructNewObjectOfPath",
  2574. path);
  2575. return -1;
  2576. }
  2577. //FileSystem#setWorkingDirectory()
  2578. jthr = invokeMethod(env, NULL, INSTANCE, jFS, JC_FILE_SYSTEM,
  2579. "setWorkingDirectory", "(Lorg/apache/hadoop/fs/Path;)V",
  2580. jPath);
  2581. destroyLocalReference(env, jPath);
  2582. if (jthr) {
  2583. errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ILLEGAL_ARGUMENT,
  2584. "hdfsSetWorkingDirectory(%s): FileSystem#setWorkingDirectory",
  2585. path);
  2586. return -1;
  2587. }
  2588. return 0;
  2589. }
  2590. int hdfsCreateDirectory(hdfsFS fs, const char *path)
  2591. {
  2592. // JAVA EQUIVALENT:
  2593. // fs.mkdirs(new Path(path));
  2594. jobject jFS = (jobject)fs;
  2595. jobject jPath;
  2596. jthrowable jthr;
  2597. jvalue jVal;
  2598. //Get the JNIEnv* corresponding to current thread
  2599. JNIEnv* env = getJNIEnv();
  2600. if (env == NULL) {
  2601. errno = EINTERNAL;
  2602. return -1;
  2603. }
  2604. //Create an object of org.apache.hadoop.fs.Path
  2605. jthr = constructNewObjectOfPath(env, path, &jPath);
  2606. if (jthr) {
  2607. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2608. "hdfsCreateDirectory(%s): constructNewObjectOfPath", path);
  2609. return -1;
  2610. }
  2611. //Create the directory
  2612. jVal.z = 0;
  2613. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  2614. "mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z", jPath);
  2615. destroyLocalReference(env, jPath);
  2616. if (jthr) {
  2617. errno = printExceptionAndFree(env, jthr,
  2618. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2619. NOPRINT_EXC_UNRESOLVED_LINK | NOPRINT_EXC_PARENT_NOT_DIRECTORY,
  2620. "hdfsCreateDirectory(%s): FileSystem#mkdirs", path);
  2621. return -1;
  2622. }
  2623. if (!jVal.z) {
  2624. // It's unclear under exactly which conditions FileSystem#mkdirs
  2625. // is supposed to return false (as opposed to throwing an exception.)
  2626. // It seems like the current code never actually returns false.
  2627. // So we're going to translate this to EIO, since there seems to be
  2628. // nothing more specific we can do with it.
  2629. errno = EIO;
  2630. return -1;
  2631. }
  2632. return 0;
  2633. }
  2634. int hdfsSetReplication(hdfsFS fs, const char *path, int16_t replication)
  2635. {
  2636. // JAVA EQUIVALENT:
  2637. // fs.setReplication(new Path(path), replication);
  2638. jobject jFS = (jobject)fs;
  2639. jthrowable jthr;
  2640. jobject jPath;
  2641. jvalue jVal;
  2642. //Get the JNIEnv* corresponding to current thread
  2643. JNIEnv* env = getJNIEnv();
  2644. if (env == NULL) {
  2645. errno = EINTERNAL;
  2646. return -1;
  2647. }
  2648. //Create an object of org.apache.hadoop.fs.Path
  2649. jthr = constructNewObjectOfPath(env, path, &jPath);
  2650. if (jthr) {
  2651. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2652. "hdfsSetReplication(path=%s): constructNewObjectOfPath", path);
  2653. return -1;
  2654. }
  2655. //Create the directory
  2656. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  2657. "setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z",
  2658. jPath, replication);
  2659. destroyLocalReference(env, jPath);
  2660. if (jthr) {
  2661. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2662. "hdfsSetReplication(path=%s, replication=%d): "
  2663. "FileSystem#setReplication", path, replication);
  2664. return -1;
  2665. }
  2666. if (!jVal.z) {
  2667. // setReplication returns false "if file does not exist or is a
  2668. // directory." So the nearest translation to that is ENOENT.
  2669. errno = ENOENT;
  2670. return -1;
  2671. }
  2672. return 0;
  2673. }
  2674. int hdfsChown(hdfsFS fs, const char *path, const char *owner, const char *group)
  2675. {
  2676. // JAVA EQUIVALENT:
  2677. // fs.setOwner(path, owner, group)
  2678. jobject jFS = (jobject)fs;
  2679. jobject jPath = NULL;
  2680. jstring jOwner = NULL, jGroup = NULL;
  2681. jthrowable jthr;
  2682. int ret;
  2683. //Get the JNIEnv* corresponding to current thread
  2684. JNIEnv* env = getJNIEnv();
  2685. if (env == NULL) {
  2686. errno = EINTERNAL;
  2687. return -1;
  2688. }
  2689. if (owner == NULL && group == NULL) {
  2690. return 0;
  2691. }
  2692. jthr = constructNewObjectOfPath(env, path, &jPath);
  2693. if (jthr) {
  2694. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2695. "hdfsChown(path=%s): constructNewObjectOfPath", path);
  2696. goto done;
  2697. }
  2698. jthr = newJavaStr(env, owner, &jOwner);
  2699. if (jthr) {
  2700. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2701. "hdfsChown(path=%s): newJavaStr(%s)", path, owner);
  2702. goto done;
  2703. }
  2704. jthr = newJavaStr(env, group, &jGroup);
  2705. if (jthr) {
  2706. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2707. "hdfsChown(path=%s): newJavaStr(%s)", path, group);
  2708. goto done;
  2709. }
  2710. //Create the directory
  2711. jthr = invokeMethod(env, NULL, INSTANCE, jFS, JC_FILE_SYSTEM,
  2712. "setOwner", JMETHOD3(JPARAM(HADOOP_PATH),
  2713. JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID),
  2714. jPath, jOwner, jGroup);
  2715. if (jthr) {
  2716. ret = printExceptionAndFree(env, jthr,
  2717. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2718. NOPRINT_EXC_UNRESOLVED_LINK,
  2719. "hdfsChown(path=%s, owner=%s, group=%s): "
  2720. "FileSystem#setOwner", path, owner, group);
  2721. goto done;
  2722. }
  2723. ret = 0;
  2724. done:
  2725. destroyLocalReference(env, jPath);
  2726. destroyLocalReference(env, jOwner);
  2727. destroyLocalReference(env, jGroup);
  2728. if (ret) {
  2729. errno = ret;
  2730. return -1;
  2731. }
  2732. return 0;
  2733. }
  2734. int hdfsChmod(hdfsFS fs, const char *path, short mode)
  2735. {
  2736. int ret;
  2737. // JAVA EQUIVALENT:
  2738. // fs.setPermission(path, FsPermission)
  2739. jthrowable jthr;
  2740. jobject jPath = NULL, jPermObj = NULL;
  2741. jobject jFS = (jobject)fs;
  2742. jshort jmode = mode;
  2743. //Get the JNIEnv* corresponding to current thread
  2744. JNIEnv* env = getJNIEnv();
  2745. if (env == NULL) {
  2746. errno = EINTERNAL;
  2747. return -1;
  2748. }
  2749. // construct jPerm = FsPermission.createImmutable(short mode);
  2750. jthr = constructNewObjectOfCachedClass(env, &jPermObj, JC_FS_PERMISSION,
  2751. "(S)V",jmode);
  2752. if (jthr) {
  2753. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2754. "constructNewObjectOfCachedClass(%s)", HADOOP_FSPERM);
  2755. goto done;
  2756. }
  2757. //Create an object of org.apache.hadoop.fs.Path
  2758. jthr = constructNewObjectOfPath(env, path, &jPath);
  2759. if (jthr) {
  2760. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2761. "hdfsChmod(%s): constructNewObjectOfPath", path);
  2762. goto done;
  2763. }
  2764. //Create the directory
  2765. jthr = invokeMethod(env, NULL, INSTANCE, jFS, JC_FILE_SYSTEM,
  2766. "setPermission", JMETHOD2(JPARAM(HADOOP_PATH),
  2767. JPARAM(HADOOP_FSPERM), JAVA_VOID), jPath, jPermObj);
  2768. if (jthr) {
  2769. ret = printExceptionAndFree(env, jthr,
  2770. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2771. NOPRINT_EXC_UNRESOLVED_LINK,
  2772. "hdfsChmod(%s): FileSystem#setPermission", path);
  2773. goto done;
  2774. }
  2775. ret = 0;
  2776. done:
  2777. destroyLocalReference(env, jPath);
  2778. destroyLocalReference(env, jPermObj);
  2779. if (ret) {
  2780. errno = ret;
  2781. return -1;
  2782. }
  2783. return 0;
  2784. }
  2785. int hdfsUtime(hdfsFS fs, const char *path, tTime mtime, tTime atime)
  2786. {
  2787. // JAVA EQUIVALENT:
  2788. // fs.setTimes(src, mtime, atime)
  2789. jthrowable jthr;
  2790. jobject jFS = (jobject)fs;
  2791. jobject jPath;
  2792. static const tTime NO_CHANGE = -1;
  2793. jlong jmtime, jatime;
  2794. //Get the JNIEnv* corresponding to current thread
  2795. JNIEnv* env = getJNIEnv();
  2796. if (env == NULL) {
  2797. errno = EINTERNAL;
  2798. return -1;
  2799. }
  2800. //Create an object of org.apache.hadoop.fs.Path
  2801. jthr = constructNewObjectOfPath(env, path, &jPath);
  2802. if (jthr) {
  2803. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2804. "hdfsUtime(path=%s): constructNewObjectOfPath", path);
  2805. return -1;
  2806. }
  2807. jmtime = (mtime == NO_CHANGE) ? -1 : (mtime * (jlong)1000);
  2808. jatime = (atime == NO_CHANGE) ? -1 : (atime * (jlong)1000);
  2809. jthr = invokeMethod(env, NULL, INSTANCE, jFS, JC_FILE_SYSTEM,
  2810. "setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J",
  2811. JAVA_VOID), jPath, jmtime, jatime);
  2812. destroyLocalReference(env, jPath);
  2813. if (jthr) {
  2814. errno = printExceptionAndFree(env, jthr,
  2815. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  2816. NOPRINT_EXC_UNRESOLVED_LINK,
  2817. "hdfsUtime(path=%s): FileSystem#setTimes", path);
  2818. return -1;
  2819. }
  2820. return 0;
  2821. }
  2822. /**
  2823. * Zero-copy options.
  2824. *
  2825. * We cache the EnumSet of ReadOptions which has to be passed into every
  2826. * readZero call, to avoid reconstructing it each time. This cache is cleared
  2827. * whenever an element changes.
  2828. */
  2829. struct hadoopRzOptions
  2830. {
  2831. JNIEnv *env;
  2832. int skipChecksums;
  2833. jobject byteBufferPool;
  2834. jobject cachedEnumSet;
  2835. };
  2836. struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
  2837. {
  2838. struct hadoopRzOptions *opts;
  2839. JNIEnv *env;
  2840. env = getJNIEnv();
  2841. if (!env) {
  2842. // Check to make sure the JNI environment is set up properly.
  2843. errno = EINTERNAL;
  2844. return NULL;
  2845. }
  2846. opts = calloc(1, sizeof(struct hadoopRzOptions));
  2847. if (!opts) {
  2848. errno = ENOMEM;
  2849. return NULL;
  2850. }
  2851. return opts;
  2852. }
  2853. static void hadoopRzOptionsClearCached(JNIEnv *env,
  2854. struct hadoopRzOptions *opts)
  2855. {
  2856. if (!opts->cachedEnumSet) {
  2857. return;
  2858. }
  2859. (*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
  2860. opts->cachedEnumSet = NULL;
  2861. }
  2862. int hadoopRzOptionsSetSkipChecksum(
  2863. struct hadoopRzOptions *opts, int skip)
  2864. {
  2865. JNIEnv *env;
  2866. env = getJNIEnv();
  2867. if (!env) {
  2868. errno = EINTERNAL;
  2869. return -1;
  2870. }
  2871. hadoopRzOptionsClearCached(env, opts);
  2872. opts->skipChecksums = !!skip;
  2873. return 0;
  2874. }
  2875. int hadoopRzOptionsSetByteBufferPool(
  2876. struct hadoopRzOptions *opts, const char *className)
  2877. {
  2878. JNIEnv *env;
  2879. jthrowable jthr;
  2880. jobject byteBufferPool = NULL;
  2881. jobject globalByteBufferPool = NULL;
  2882. int ret;
  2883. env = getJNIEnv();
  2884. if (!env) {
  2885. errno = EINTERNAL;
  2886. return -1;
  2887. }
  2888. if (className) {
  2889. // Note: we don't have to call hadoopRzOptionsClearCached in this
  2890. // function, since the ByteBufferPool is passed separately from the
  2891. // EnumSet of ReadOptions.
  2892. jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
  2893. if (jthr) {
  2894. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  2895. "hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
  2896. ret = EINVAL;
  2897. goto done;
  2898. }
  2899. // Only set opts->byteBufferPool if creating a global reference is
  2900. // successful
  2901. globalByteBufferPool = (*env)->NewGlobalRef(env, byteBufferPool);
  2902. if (!globalByteBufferPool) {
  2903. printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  2904. "hadoopRzOptionsSetByteBufferPool(className=%s): ",
  2905. className);
  2906. ret = EINVAL;
  2907. goto done;
  2908. }
  2909. // Delete any previous ByteBufferPool we had before setting a new one.
  2910. if (opts->byteBufferPool) {
  2911. (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
  2912. }
  2913. opts->byteBufferPool = globalByteBufferPool;
  2914. } else if (opts->byteBufferPool) {
  2915. // If the specified className is NULL, delete any previous
  2916. // ByteBufferPool we had.
  2917. (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
  2918. opts->byteBufferPool = NULL;
  2919. }
  2920. ret = 0;
  2921. done:
  2922. destroyLocalReference(env, byteBufferPool);
  2923. if (ret) {
  2924. errno = ret;
  2925. return -1;
  2926. }
  2927. return 0;
  2928. }
  2929. void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
  2930. {
  2931. JNIEnv *env;
  2932. env = getJNIEnv();
  2933. if (!env) {
  2934. return;
  2935. }
  2936. hadoopRzOptionsClearCached(env, opts);
  2937. if (opts->byteBufferPool) {
  2938. (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
  2939. opts->byteBufferPool = NULL;
  2940. }
  2941. free(opts);
  2942. }
  2943. struct hadoopRzBuffer
  2944. {
  2945. jobject byteBuffer;
  2946. uint8_t *ptr;
  2947. int32_t length;
  2948. int direct;
  2949. };
  2950. static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
  2951. struct hadoopRzOptions *opts, jobject *enumSet)
  2952. {
  2953. jthrowable jthr = NULL;
  2954. jobject enumInst = NULL, enumSetObj = NULL;
  2955. jvalue jVal;
  2956. if (opts->cachedEnumSet) {
  2957. // If we cached the value, return it now.
  2958. *enumSet = opts->cachedEnumSet;
  2959. goto done;
  2960. }
  2961. if (opts->skipChecksums) {
  2962. jthr = fetchEnumInstance(env, HADOOP_RO,
  2963. "SKIP_CHECKSUMS", &enumInst);
  2964. if (jthr) {
  2965. goto done;
  2966. }
  2967. jthr = invokeMethod(env, &jVal, STATIC, NULL, JC_ENUM_SET,
  2968. "of", "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
  2969. if (jthr) {
  2970. goto done;
  2971. }
  2972. enumSetObj = jVal.l;
  2973. } else {
  2974. jclass clazz = (*env)->FindClass(env, HADOOP_RO);
  2975. if (!clazz) {
  2976. jthr = getPendingExceptionAndClear(env);
  2977. goto done;
  2978. }
  2979. jthr = invokeMethod(env, &jVal, STATIC, NULL, JC_ENUM_SET,
  2980. "noneOf", "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
  2981. if (jthr) {
  2982. goto done;
  2983. }
  2984. enumSetObj = jVal.l;
  2985. }
  2986. // create global ref
  2987. opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
  2988. if (!opts->cachedEnumSet) {
  2989. jthr = getPendingExceptionAndClear(env);
  2990. goto done;
  2991. }
  2992. *enumSet = opts->cachedEnumSet;
  2993. jthr = NULL;
  2994. done:
  2995. (*env)->DeleteLocalRef(env, enumInst);
  2996. (*env)->DeleteLocalRef(env, enumSetObj);
  2997. return jthr;
  2998. }
  2999. static int hadoopReadZeroExtractBuffer(JNIEnv *env,
  3000. const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
  3001. {
  3002. int ret;
  3003. jthrowable jthr;
  3004. jvalue jVal;
  3005. uint8_t *directStart;
  3006. void *mallocBuf = NULL;
  3007. jint position;
  3008. jarray array = NULL;
  3009. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  3010. JC_BYTE_BUFFER, "remaining", "()I");
  3011. if (jthr) {
  3012. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3013. "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
  3014. goto done;
  3015. }
  3016. buffer->length = jVal.i;
  3017. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  3018. JC_BYTE_BUFFER, "position", "()I");
  3019. if (jthr) {
  3020. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3021. "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
  3022. goto done;
  3023. }
  3024. position = jVal.i;
  3025. directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
  3026. if (directStart) {
  3027. // Handle direct buffers.
  3028. buffer->ptr = directStart + position;
  3029. buffer->direct = 1;
  3030. ret = 0;
  3031. goto done;
  3032. }
  3033. // Handle indirect buffers.
  3034. // The JNI docs don't say that GetDirectBufferAddress throws any exceptions
  3035. // when it fails. However, they also don't clearly say that it doesn't. It
  3036. // seems safest to clear any pending exceptions here, to prevent problems on
  3037. // various JVMs.
  3038. (*env)->ExceptionClear(env);
  3039. if (!opts->byteBufferPool) {
  3040. fputs("hadoopReadZeroExtractBuffer: we read through the "
  3041. "zero-copy path, but failed to get the address of the buffer via "
  3042. "GetDirectBufferAddress. Please make sure your JVM supports "
  3043. "GetDirectBufferAddress.\n", stderr);
  3044. ret = ENOTSUP;
  3045. goto done;
  3046. }
  3047. // Get the backing array object of this buffer.
  3048. jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
  3049. JC_BYTE_BUFFER, "array", "()[B");
  3050. if (jthr) {
  3051. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3052. "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
  3053. goto done;
  3054. }
  3055. array = jVal.l;
  3056. if (!array) {
  3057. fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
  3058. stderr);
  3059. ret = EIO;
  3060. goto done;
  3061. }
  3062. mallocBuf = malloc(buffer->length);
  3063. if (!mallocBuf) {
  3064. fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
  3065. buffer->length);
  3066. ret = ENOMEM;
  3067. goto done;
  3068. }
  3069. (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
  3070. jthr = (*env)->ExceptionOccurred(env);
  3071. if (jthr) {
  3072. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3073. "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
  3074. goto done;
  3075. }
  3076. buffer->ptr = mallocBuf;
  3077. buffer->direct = 0;
  3078. ret = 0;
  3079. done:
  3080. free(mallocBuf);
  3081. (*env)->DeleteLocalRef(env, array);
  3082. return ret;
  3083. }
  3084. static int translateZCRException(JNIEnv *env, jthrowable exc)
  3085. {
  3086. int ret;
  3087. char *className = NULL;
  3088. jthrowable jthr = classNameOfObject(exc, env, &className);
  3089. if (jthr) {
  3090. fputs("hadoopReadZero: failed to get class name of "
  3091. "exception from read().\n", stderr);
  3092. destroyLocalReference(env, exc);
  3093. destroyLocalReference(env, jthr);
  3094. ret = EIO;
  3095. goto done;
  3096. }
  3097. if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
  3098. ret = EPROTONOSUPPORT;
  3099. destroyLocalReference(env, exc);
  3100. goto done;
  3101. }
  3102. ret = printExceptionAndFree(env, exc, PRINT_EXC_ALL,
  3103. "hadoopZeroCopyRead: ZeroCopyCursor#read failed");
  3104. done:
  3105. free(className);
  3106. return ret;
  3107. }
  3108. struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
  3109. struct hadoopRzOptions *opts, int32_t maxLength)
  3110. {
  3111. JNIEnv *env;
  3112. jthrowable jthr = NULL;
  3113. jvalue jVal;
  3114. jobject enumSet = NULL, byteBuffer = NULL;
  3115. struct hadoopRzBuffer* buffer = NULL;
  3116. int ret;
  3117. env = getJNIEnv();
  3118. if (!env) {
  3119. errno = EINTERNAL;
  3120. return NULL;
  3121. }
  3122. if (file->type != HDFS_STREAM_INPUT) {
  3123. fputs("Cannot read from a non-InputStream object!\n", stderr);
  3124. ret = EINVAL;
  3125. goto done;
  3126. }
  3127. buffer = calloc(1, sizeof(struct hadoopRzBuffer));
  3128. if (!buffer) {
  3129. ret = ENOMEM;
  3130. goto done;
  3131. }
  3132. jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
  3133. if (jthr) {
  3134. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3135. "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
  3136. goto done;
  3137. }
  3138. jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
  3139. JC_FS_DATA_INPUT_STREAM, "read",
  3140. "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
  3141. "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
  3142. if (jthr) {
  3143. ret = translateZCRException(env, jthr);
  3144. goto done;
  3145. }
  3146. byteBuffer = jVal.l;
  3147. if (!byteBuffer) {
  3148. buffer->byteBuffer = NULL;
  3149. buffer->length = 0;
  3150. buffer->ptr = NULL;
  3151. } else {
  3152. buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
  3153. if (!buffer->byteBuffer) {
  3154. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  3155. "hadoopReadZero: failed to create global ref to ByteBuffer");
  3156. goto done;
  3157. }
  3158. ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
  3159. if (ret) {
  3160. goto done;
  3161. }
  3162. }
  3163. ret = 0;
  3164. done:
  3165. (*env)->DeleteLocalRef(env, byteBuffer);
  3166. if (ret) {
  3167. if (buffer) {
  3168. if (buffer->byteBuffer) {
  3169. (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
  3170. }
  3171. free(buffer);
  3172. }
  3173. errno = ret;
  3174. return NULL;
  3175. } else {
  3176. errno = 0;
  3177. }
  3178. return buffer;
  3179. }
  3180. int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
  3181. {
  3182. return buffer->length;
  3183. }
  3184. const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
  3185. {
  3186. return buffer->ptr;
  3187. }
  3188. void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
  3189. {
  3190. jvalue jVal;
  3191. jthrowable jthr;
  3192. JNIEnv* env;
  3193. env = getJNIEnv();
  3194. if (env == NULL) {
  3195. errno = EINTERNAL;
  3196. return;
  3197. }
  3198. if (buffer->byteBuffer) {
  3199. jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
  3200. JC_FS_DATA_INPUT_STREAM, "releaseBuffer",
  3201. "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
  3202. if (jthr) {
  3203. printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3204. "hadoopRzBufferFree: releaseBuffer failed: ");
  3205. // even on error, we have to delete the reference.
  3206. }
  3207. (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
  3208. }
  3209. if (!buffer->direct) {
  3210. free(buffer->ptr);
  3211. }
  3212. memset(buffer, 0, sizeof(*buffer));
  3213. free(buffer);
  3214. }
  3215. char***
  3216. hdfsGetHosts(hdfsFS fs, const char *path, tOffset start, tOffset length)
  3217. {
  3218. // JAVA EQUIVALENT:
  3219. // fs.getFileBlockLoctions(new Path(path), start, length);
  3220. jobject jFS = (jobject)fs;
  3221. jthrowable jthr;
  3222. jobject jPath = NULL;
  3223. jobject jFileStatus = NULL;
  3224. jvalue jFSVal, jVal;
  3225. jobjectArray jBlockLocations = NULL, jFileBlockHosts = NULL;
  3226. jstring jHost = NULL;
  3227. char*** blockHosts = NULL;
  3228. int i, j, ret;
  3229. jsize jNumFileBlocks = 0;
  3230. jobject jFileBlock;
  3231. jsize jNumBlockHosts;
  3232. const char *hostName;
  3233. //Get the JNIEnv* corresponding to current thread
  3234. JNIEnv* env = getJNIEnv();
  3235. if (env == NULL) {
  3236. errno = EINTERNAL;
  3237. return NULL;
  3238. }
  3239. //Create an object of org.apache.hadoop.fs.Path
  3240. jthr = constructNewObjectOfPath(env, path, &jPath);
  3241. if (jthr) {
  3242. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3243. "hdfsGetHosts(path=%s): constructNewObjectOfPath", path);
  3244. goto done;
  3245. }
  3246. jthr = invokeMethod(env, &jFSVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  3247. "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)"
  3248. "Lorg/apache/hadoop/fs/FileStatus;", jPath);
  3249. if (jthr) {
  3250. ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND,
  3251. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  3252. "FileSystem#getFileStatus", path, start, length);
  3253. destroyLocalReference(env, jPath);
  3254. goto done;
  3255. }
  3256. jFileStatus = jFSVal.l;
  3257. //org.apache.hadoop.fs.FileSystem#getFileBlockLocations
  3258. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  3259. "getFileBlockLocations",
  3260. "(Lorg/apache/hadoop/fs/FileStatus;JJ)"
  3261. "[Lorg/apache/hadoop/fs/BlockLocation;", jFileStatus, start,
  3262. length);
  3263. if (jthr) {
  3264. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3265. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  3266. "FileSystem#getFileBlockLocations", path, start, length);
  3267. goto done;
  3268. }
  3269. jBlockLocations = jVal.l;
  3270. //Figure out no of entries in jBlockLocations
  3271. //Allocate memory and add NULL at the end
  3272. jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations);
  3273. blockHosts = calloc(jNumFileBlocks + 1, sizeof(char**));
  3274. if (blockHosts == NULL) {
  3275. ret = ENOMEM;
  3276. goto done;
  3277. }
  3278. if (jNumFileBlocks == 0) {
  3279. ret = 0;
  3280. goto done;
  3281. }
  3282. //Now parse each block to get hostnames
  3283. for (i = 0; i < jNumFileBlocks; ++i) {
  3284. jFileBlock =
  3285. (*env)->GetObjectArrayElement(env, jBlockLocations, i);
  3286. jthr = (*env)->ExceptionOccurred(env);
  3287. if (jthr || !jFileBlock) {
  3288. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3289. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  3290. "GetObjectArrayElement(%d)", path, start, length, i);
  3291. goto done;
  3292. }
  3293. jthr = invokeMethod(env, &jVal, INSTANCE, jFileBlock,
  3294. JC_BLOCK_LOCATION, "getHosts",
  3295. "()[Ljava/lang/String;");
  3296. if (jthr) {
  3297. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3298. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  3299. "BlockLocation#getHosts", path, start, length);
  3300. goto done;
  3301. }
  3302. jFileBlockHosts = jVal.l;
  3303. if (!jFileBlockHosts) {
  3304. fprintf(stderr,
  3305. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
  3306. "BlockLocation#getHosts returned NULL", path, start, length);
  3307. ret = EINTERNAL;
  3308. goto done;
  3309. }
  3310. //Figure out no of hosts in jFileBlockHosts, and allocate the memory
  3311. jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts);
  3312. blockHosts[i] = calloc(jNumBlockHosts + 1, sizeof(char*));
  3313. if (!blockHosts[i]) {
  3314. ret = ENOMEM;
  3315. goto done;
  3316. }
  3317. //Now parse each hostname
  3318. for (j = 0; j < jNumBlockHosts; ++j) {
  3319. jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j);
  3320. jthr = (*env)->ExceptionOccurred(env);
  3321. if (jthr || !jHost) {
  3322. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3323. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"): "
  3324. "NewByteArray", path, start, length);
  3325. goto done;
  3326. }
  3327. hostName =
  3328. (const char*)((*env)->GetStringUTFChars(env, jHost, NULL));
  3329. if (!hostName) {
  3330. ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
  3331. "hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64", "
  3332. "j=%d out of %d): GetStringUTFChars",
  3333. path, start, length, j, jNumBlockHosts);
  3334. goto done;
  3335. }
  3336. blockHosts[i][j] = strdup(hostName);
  3337. (*env)->ReleaseStringUTFChars(env, jHost, hostName);
  3338. if (!blockHosts[i][j]) {
  3339. ret = ENOMEM;
  3340. goto done;
  3341. }
  3342. destroyLocalReference(env, jHost);
  3343. jHost = NULL;
  3344. }
  3345. destroyLocalReference(env, jFileBlockHosts);
  3346. jFileBlockHosts = NULL;
  3347. }
  3348. ret = 0;
  3349. done:
  3350. destroyLocalReference(env, jPath);
  3351. destroyLocalReference(env, jFileStatus);
  3352. destroyLocalReference(env, jBlockLocations);
  3353. destroyLocalReference(env, jFileBlockHosts);
  3354. destroyLocalReference(env, jHost);
  3355. if (ret) {
  3356. errno = ret;
  3357. if (blockHosts) {
  3358. hdfsFreeHosts(blockHosts);
  3359. }
  3360. return NULL;
  3361. }
  3362. return blockHosts;
  3363. }
  3364. void hdfsFreeHosts(char ***blockHosts)
  3365. {
  3366. int i, j;
  3367. for (i=0; blockHosts[i]; i++) {
  3368. for (j=0; blockHosts[i][j]; j++) {
  3369. free(blockHosts[i][j]);
  3370. }
  3371. free(blockHosts[i]);
  3372. }
  3373. free(blockHosts);
  3374. }
  3375. tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
  3376. {
  3377. // JAVA EQUIVALENT:
  3378. // fs.getDefaultBlockSize();
  3379. jobject jFS = (jobject)fs;
  3380. jvalue jVal;
  3381. jthrowable jthr;
  3382. //Get the JNIEnv* corresponding to current thread
  3383. JNIEnv* env = getJNIEnv();
  3384. if (env == NULL) {
  3385. errno = EINTERNAL;
  3386. return -1;
  3387. }
  3388. //FileSystem#getDefaultBlockSize()
  3389. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  3390. "getDefaultBlockSize", "()J");
  3391. if (jthr) {
  3392. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3393. "hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize");
  3394. return -1;
  3395. }
  3396. return jVal.j;
  3397. }
  3398. tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path)
  3399. {
  3400. // JAVA EQUIVALENT:
  3401. // fs.getDefaultBlockSize(path);
  3402. jthrowable jthr;
  3403. jobject jFS = (jobject)fs;
  3404. jobject jPath;
  3405. tOffset blockSize;
  3406. JNIEnv* env = getJNIEnv();
  3407. if (env == NULL) {
  3408. errno = EINTERNAL;
  3409. return -1;
  3410. }
  3411. jthr = constructNewObjectOfPath(env, path, &jPath);
  3412. if (jthr) {
  3413. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3414. "hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath",
  3415. path);
  3416. return -1;
  3417. }
  3418. jthr = getDefaultBlockSize(env, jFS, jPath, (jlong *)&blockSize);
  3419. (*env)->DeleteLocalRef(env, jPath);
  3420. if (jthr) {
  3421. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3422. "hdfsGetDefaultBlockSize(path=%s): "
  3423. "FileSystem#getDefaultBlockSize", path);
  3424. return -1;
  3425. }
  3426. return blockSize;
  3427. }
  3428. tOffset hdfsGetCapacity(hdfsFS fs)
  3429. {
  3430. // JAVA EQUIVALENT:
  3431. // FsStatus fss = fs.getStatus();
  3432. // return Fss.getCapacity();
  3433. jobject jFS = (jobject)fs;
  3434. jvalue jVal;
  3435. jthrowable jthr;
  3436. jobject fss;
  3437. //Get the JNIEnv* corresponding to current thread
  3438. JNIEnv* env = getJNIEnv();
  3439. if (env == NULL) {
  3440. errno = EINTERNAL;
  3441. return -1;
  3442. }
  3443. //FileSystem#getStatus
  3444. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  3445. "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
  3446. if (jthr) {
  3447. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3448. "hdfsGetCapacity: FileSystem#getStatus");
  3449. return -1;
  3450. }
  3451. fss = (jobject)jVal.l;
  3452. jthr = invokeMethod(env, &jVal, INSTANCE, fss,
  3453. JC_FS_STATUS, "getCapacity", "()J");
  3454. destroyLocalReference(env, fss);
  3455. if (jthr) {
  3456. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3457. "hdfsGetCapacity: FsStatus#getCapacity");
  3458. return -1;
  3459. }
  3460. return jVal.j;
  3461. }
  3462. tOffset hdfsGetUsed(hdfsFS fs)
  3463. {
  3464. // JAVA EQUIVALENT:
  3465. // FsStatus fss = fs.getStatus();
  3466. // return Fss.getUsed();
  3467. jobject jFS = (jobject)fs;
  3468. jvalue jVal;
  3469. jthrowable jthr;
  3470. jobject fss;
  3471. //Get the JNIEnv* corresponding to current thread
  3472. JNIEnv* env = getJNIEnv();
  3473. if (env == NULL) {
  3474. errno = EINTERNAL;
  3475. return -1;
  3476. }
  3477. //FileSystem#getStatus
  3478. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  3479. "getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
  3480. if (jthr) {
  3481. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3482. "hdfsGetUsed: FileSystem#getStatus");
  3483. return -1;
  3484. }
  3485. fss = (jobject)jVal.l;
  3486. jthr = invokeMethod(env, &jVal, INSTANCE, fss, JC_FS_STATUS,
  3487. "getUsed", "()J");
  3488. destroyLocalReference(env, fss);
  3489. if (jthr) {
  3490. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3491. "hdfsGetUsed: FsStatus#getUsed");
  3492. return -1;
  3493. }
  3494. return jVal.j;
  3495. }
  3496. /**
  3497. * We cannot add new fields to the hdfsFileInfo structure because it would break
  3498. * binary compatibility. The reason is because we return an array
  3499. * of hdfsFileInfo structures from hdfsListDirectory. So changing the size of
  3500. * those structures would break all programs that relied on finding the second
  3501. * element in the array at <base_offset> + sizeof(struct hdfsFileInfo).
  3502. *
  3503. * So instead, we add the new fields to the hdfsExtendedFileInfo structure.
  3504. * This structure is contained in the mOwner string found inside the
  3505. * hdfsFileInfo. Specifically, the format of mOwner is:
  3506. *
  3507. * [owner-string] [null byte] [padding] [hdfsExtendedFileInfo structure]
  3508. *
  3509. * The padding is added so that the hdfsExtendedFileInfo structure starts on an
  3510. * 8-byte boundary.
  3511. *
  3512. * @param str The string to locate the extended info in.
  3513. * @return The offset of the hdfsExtendedFileInfo structure.
  3514. */
  3515. static size_t getExtendedFileInfoOffset(const char *str)
  3516. {
  3517. int num_64_bit_words = ((strlen(str) + 1) + 7) / 8;
  3518. return num_64_bit_words * 8;
  3519. }
  3520. static struct hdfsExtendedFileInfo *getExtendedFileInfo(hdfsFileInfo *fileInfo)
  3521. {
  3522. char *owner = fileInfo->mOwner;
  3523. return (struct hdfsExtendedFileInfo *)(owner +
  3524. getExtendedFileInfoOffset(owner));
  3525. }
  3526. static jthrowable
  3527. getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo)
  3528. {
  3529. jvalue jVal;
  3530. jthrowable jthr;
  3531. jobject jPath = NULL;
  3532. jstring jPathName = NULL;
  3533. jstring jUserName = NULL;
  3534. jstring jGroupName = NULL;
  3535. jobject jPermission = NULL;
  3536. const char *cPathName;
  3537. const char *cUserName;
  3538. const char *cGroupName;
  3539. struct hdfsExtendedFileInfo *extInfo;
  3540. size_t extOffset;
  3541. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS, "isDir",
  3542. "()Z");
  3543. if (jthr)
  3544. goto done;
  3545. fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile;
  3546. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3547. "getReplication", "()S");
  3548. if (jthr)
  3549. goto done;
  3550. fileInfo->mReplication = jVal.s;
  3551. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3552. "getBlockSize", "()J");
  3553. if (jthr)
  3554. goto done;
  3555. fileInfo->mBlockSize = jVal.j;
  3556. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3557. "getModificationTime", "()J");
  3558. if (jthr)
  3559. goto done;
  3560. fileInfo->mLastMod = jVal.j / 1000;
  3561. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3562. "getAccessTime", "()J");
  3563. if (jthr)
  3564. goto done;
  3565. fileInfo->mLastAccess = (tTime) (jVal.j / 1000);
  3566. if (fileInfo->mKind == kObjectKindFile) {
  3567. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3568. "getLen", "()J");
  3569. if (jthr)
  3570. goto done;
  3571. fileInfo->mSize = jVal.j;
  3572. }
  3573. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3574. "getPath", "()Lorg/apache/hadoop/fs/Path;");
  3575. if (jthr)
  3576. goto done;
  3577. jPath = jVal.l;
  3578. if (jPath == NULL) {
  3579. jthr = newRuntimeError(env, "org.apache.hadoop.fs.FileStatus#"
  3580. "getPath returned NULL!");
  3581. goto done;
  3582. }
  3583. jthr = invokeMethod(env, &jVal, INSTANCE, jPath, JC_PATH, "toString",
  3584. "()Ljava/lang/String;");
  3585. if (jthr)
  3586. goto done;
  3587. jPathName = jVal.l;
  3588. cPathName =
  3589. (const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL));
  3590. if (!cPathName) {
  3591. jthr = getPendingExceptionAndClear(env);
  3592. goto done;
  3593. }
  3594. fileInfo->mName = strdup(cPathName);
  3595. (*env)->ReleaseStringUTFChars(env, jPathName, cPathName);
  3596. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS, "getOwner",
  3597. "()Ljava/lang/String;");
  3598. if (jthr)
  3599. goto done;
  3600. jUserName = jVal.l;
  3601. cUserName =
  3602. (const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL));
  3603. if (!cUserName) {
  3604. jthr = getPendingExceptionAndClear(env);
  3605. goto done;
  3606. }
  3607. extOffset = getExtendedFileInfoOffset(cUserName);
  3608. fileInfo->mOwner = malloc(extOffset + sizeof(struct hdfsExtendedFileInfo));
  3609. if (!fileInfo->mOwner) {
  3610. jthr = newRuntimeError(env, "getFileInfo: OOM allocating mOwner");
  3611. goto done;
  3612. }
  3613. strcpy(fileInfo->mOwner, cUserName);
  3614. (*env)->ReleaseStringUTFChars(env, jUserName, cUserName);
  3615. extInfo = getExtendedFileInfo(fileInfo);
  3616. memset(extInfo, 0, sizeof(*extInfo));
  3617. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3618. "isEncrypted", "()Z");
  3619. if (jthr) {
  3620. goto done;
  3621. }
  3622. if (jVal.z == JNI_TRUE) {
  3623. extInfo->flags |= HDFS_EXTENDED_FILE_INFO_ENCRYPTED;
  3624. }
  3625. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3626. "getGroup", "()Ljava/lang/String;");
  3627. if (jthr)
  3628. goto done;
  3629. jGroupName = jVal.l;
  3630. cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL));
  3631. if (!cGroupName) {
  3632. jthr = getPendingExceptionAndClear(env);
  3633. goto done;
  3634. }
  3635. fileInfo->mGroup = strdup(cGroupName);
  3636. (*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName);
  3637. jthr = invokeMethod(env, &jVal, INSTANCE, jStat, JC_FILE_STATUS,
  3638. "getPermission",
  3639. "()Lorg/apache/hadoop/fs/permission/FsPermission;");
  3640. if (jthr)
  3641. goto done;
  3642. if (jVal.l == NULL) {
  3643. jthr = newRuntimeError(env, "%s#getPermission returned NULL!",
  3644. HADOOP_FILESTAT);
  3645. goto done;
  3646. }
  3647. jPermission = jVal.l;
  3648. jthr = invokeMethod(env, &jVal, INSTANCE, jPermission,
  3649. JC_FS_PERMISSION, "toShort", "()S");
  3650. if (jthr)
  3651. goto done;
  3652. fileInfo->mPermissions = jVal.s;
  3653. jthr = NULL;
  3654. done:
  3655. if (jthr)
  3656. hdfsFreeFileInfoEntry(fileInfo);
  3657. destroyLocalReference(env, jPath);
  3658. destroyLocalReference(env, jPathName);
  3659. destroyLocalReference(env, jUserName);
  3660. destroyLocalReference(env, jGroupName);
  3661. destroyLocalReference(env, jPermission);
  3662. return jthr;
  3663. }
  3664. static jthrowable
  3665. getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo **fileInfo)
  3666. {
  3667. // JAVA EQUIVALENT:
  3668. // fs.isDirectory(f)
  3669. // fs.getModificationTime()
  3670. // fs.getAccessTime()
  3671. // fs.getLength(f)
  3672. // f.getPath()
  3673. // f.getOwner()
  3674. // f.getGroup()
  3675. // f.getPermission().toShort()
  3676. jobject jStat;
  3677. jvalue jVal;
  3678. jthrowable jthr;
  3679. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM, "exists",
  3680. JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath);
  3681. if (jthr)
  3682. return jthr;
  3683. if (jVal.z == 0) {
  3684. *fileInfo = NULL;
  3685. return NULL;
  3686. }
  3687. jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
  3688. "getFileStatus", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM
  3689. (HADOOP_FILESTAT)), jPath);
  3690. if (jthr)
  3691. return jthr;
  3692. jStat = jVal.l;
  3693. *fileInfo = calloc(1, sizeof(hdfsFileInfo));
  3694. if (!*fileInfo) {
  3695. destroyLocalReference(env, jStat);
  3696. return newRuntimeError(env, "getFileInfo: OOM allocating hdfsFileInfo");
  3697. }
  3698. jthr = getFileInfoFromStat(env, jStat, *fileInfo);
  3699. destroyLocalReference(env, jStat);
  3700. return jthr;
  3701. }
  3702. hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char *path, int *numEntries)
  3703. {
  3704. // JAVA EQUIVALENT:
  3705. // Path p(path);
  3706. // Path []pathList = fs.listPaths(p)
  3707. // foreach path in pathList
  3708. // getFileInfo(path)
  3709. jobject jFS = (jobject)fs;
  3710. jthrowable jthr;
  3711. jobject jPath = NULL;
  3712. hdfsFileInfo *pathList = NULL;
  3713. jobjectArray jPathList = NULL;
  3714. jvalue jVal;
  3715. jsize jPathListSize = 0;
  3716. int ret;
  3717. jsize i;
  3718. jobject tmpStat;
  3719. //Get the JNIEnv* corresponding to current thread
  3720. JNIEnv* env = getJNIEnv();
  3721. if (env == NULL) {
  3722. errno = EINTERNAL;
  3723. return NULL;
  3724. }
  3725. //Create an object of org.apache.hadoop.fs.Path
  3726. jthr = constructNewObjectOfPath(env, path, &jPath);
  3727. if (jthr) {
  3728. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3729. "hdfsListDirectory(%s): constructNewObjectOfPath", path);
  3730. goto done;
  3731. }
  3732. jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
  3733. JC_DISTRIBUTED_FILE_SYSTEM, "listStatus",
  3734. JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_FILESTAT)), jPath);
  3735. if (jthr) {
  3736. ret = printExceptionAndFree(env, jthr,
  3737. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  3738. NOPRINT_EXC_UNRESOLVED_LINK,
  3739. "hdfsListDirectory(%s): FileSystem#listStatus", path);
  3740. goto done;
  3741. }
  3742. jPathList = jVal.l;
  3743. //Figure out the number of entries in that directory
  3744. jPathListSize = (*env)->GetArrayLength(env, jPathList);
  3745. if (jPathListSize == 0) {
  3746. ret = 0;
  3747. goto done;
  3748. }
  3749. //Allocate memory
  3750. pathList = calloc(jPathListSize, sizeof(hdfsFileInfo));
  3751. if (pathList == NULL) {
  3752. ret = ENOMEM;
  3753. goto done;
  3754. }
  3755. //Save path information in pathList
  3756. for (i=0; i < jPathListSize; ++i) {
  3757. tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i);
  3758. jthr = (*env)->ExceptionOccurred(env);
  3759. if (jthr || !tmpStat) {
  3760. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3761. "hdfsListDirectory(%s): GetObjectArrayElement(%d out of %d)",
  3762. path, i, jPathListSize);
  3763. goto done;
  3764. }
  3765. jthr = getFileInfoFromStat(env, tmpStat, &pathList[i]);
  3766. destroyLocalReference(env, tmpStat);
  3767. if (jthr) {
  3768. ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3769. "hdfsListDirectory(%s): getFileInfoFromStat(%d out of %d)",
  3770. path, i, jPathListSize);
  3771. goto done;
  3772. }
  3773. }
  3774. ret = 0;
  3775. done:
  3776. destroyLocalReference(env, jPath);
  3777. destroyLocalReference(env, jPathList);
  3778. if (ret) {
  3779. hdfsFreeFileInfo(pathList, jPathListSize);
  3780. errno = ret;
  3781. return NULL;
  3782. }
  3783. *numEntries = jPathListSize;
  3784. errno = 0;
  3785. return pathList;
  3786. }
  3787. hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char *path)
  3788. {
  3789. // JAVA EQUIVALENT:
  3790. // File f(path);
  3791. // fs.isDirectory(f)
  3792. // fs.lastModified() ??
  3793. // fs.getLength(f)
  3794. // f.getPath()
  3795. jobject jFS = (jobject)fs;
  3796. jobject jPath;
  3797. jthrowable jthr;
  3798. hdfsFileInfo *fileInfo;
  3799. //Get the JNIEnv* corresponding to current thread
  3800. JNIEnv* env = getJNIEnv();
  3801. if (env == NULL) {
  3802. errno = EINTERNAL;
  3803. return NULL;
  3804. }
  3805. //Create an object of org.apache.hadoop.fs.Path
  3806. jthr = constructNewObjectOfPath(env, path, &jPath);
  3807. if (jthr) {
  3808. errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
  3809. "hdfsGetPathInfo(%s): constructNewObjectOfPath", path);
  3810. return NULL;
  3811. }
  3812. jthr = getFileInfo(env, jFS, jPath, &fileInfo);
  3813. destroyLocalReference(env, jPath);
  3814. if (jthr) {
  3815. errno = printExceptionAndFree(env, jthr,
  3816. NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
  3817. NOPRINT_EXC_UNRESOLVED_LINK,
  3818. "hdfsGetPathInfo(%s): getFileInfo", path);
  3819. return NULL;
  3820. }
  3821. if (!fileInfo) {
  3822. errno = ENOENT;
  3823. return NULL;
  3824. }
  3825. return fileInfo;
  3826. }
  3827. static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo)
  3828. {
  3829. free(hdfsFileInfo->mName);
  3830. free(hdfsFileInfo->mOwner);
  3831. free(hdfsFileInfo->mGroup);
  3832. memset(hdfsFileInfo, 0, sizeof(*hdfsFileInfo));
  3833. }
  3834. void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
  3835. {
  3836. //Free the mName, mOwner, and mGroup
  3837. int i;
  3838. for (i=0; i < numEntries; ++i) {
  3839. hdfsFreeFileInfoEntry(hdfsFileInfo + i);
  3840. }
  3841. //Free entire block
  3842. free(hdfsFileInfo);
  3843. }
  3844. int hdfsFileIsEncrypted(hdfsFileInfo *fileInfo)
  3845. {
  3846. struct hdfsExtendedFileInfo *extInfo;
  3847. extInfo = getExtendedFileInfo(fileInfo);
  3848. return !!(extInfo->flags & HDFS_EXTENDED_FILE_INFO_ENCRYPTED);
  3849. }
  3850. char* hdfsGetLastExceptionRootCause()
  3851. {
  3852. return getLastTLSExceptionRootCause();
  3853. }
  3854. char* hdfsGetLastExceptionStackTrace()
  3855. {
  3856. return getLastTLSExceptionStackTrace();
  3857. }
  3858. /**
  3859. * vim: ts=4: sw=4: et:
  3860. */