ZooKeeperServer.java 86 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.zookeeper.server;
  19. import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  20. import java.io.ByteArrayOutputStream;
  21. import java.io.File;
  22. import java.io.IOException;
  23. import java.io.InputStream;
  24. import java.io.PrintWriter;
  25. import java.nio.ByteBuffer;
  26. import java.util.ArrayDeque;
  27. import java.util.ArrayList;
  28. import java.util.Arrays;
  29. import java.util.Deque;
  30. import java.util.HashMap;
  31. import java.util.List;
  32. import java.util.Map;
  33. import java.util.Properties;
  34. import java.util.Random;
  35. import java.util.Set;
  36. import java.util.concurrent.atomic.AtomicInteger;
  37. import java.util.concurrent.atomic.AtomicLong;
  38. import java.util.function.BiConsumer;
  39. import javax.security.sasl.SaslException;
  40. import org.apache.jute.BinaryInputArchive;
  41. import org.apache.jute.BinaryOutputArchive;
  42. import org.apache.jute.Record;
  43. import org.apache.zookeeper.Environment;
  44. import org.apache.zookeeper.KeeperException;
  45. import org.apache.zookeeper.KeeperException.Code;
  46. import org.apache.zookeeper.KeeperException.SessionExpiredException;
  47. import org.apache.zookeeper.Quotas;
  48. import org.apache.zookeeper.StatsTrack;
  49. import org.apache.zookeeper.Version;
  50. import org.apache.zookeeper.ZooDefs;
  51. import org.apache.zookeeper.ZooDefs.OpCode;
  52. import org.apache.zookeeper.ZookeeperBanner;
  53. import org.apache.zookeeper.common.StringUtils;
  54. import org.apache.zookeeper.common.Time;
  55. import org.apache.zookeeper.data.ACL;
  56. import org.apache.zookeeper.data.Id;
  57. import org.apache.zookeeper.data.StatPersisted;
  58. import org.apache.zookeeper.jmx.MBeanRegistry;
  59. import org.apache.zookeeper.metrics.MetricsContext;
  60. import org.apache.zookeeper.proto.AuthPacket;
  61. import org.apache.zookeeper.proto.ConnectRequest;
  62. import org.apache.zookeeper.proto.ConnectResponse;
  63. import org.apache.zookeeper.proto.CreateRequest;
  64. import org.apache.zookeeper.proto.DeleteRequest;
  65. import org.apache.zookeeper.proto.GetSASLRequest;
  66. import org.apache.zookeeper.proto.ReplyHeader;
  67. import org.apache.zookeeper.proto.RequestHeader;
  68. import org.apache.zookeeper.proto.SetACLRequest;
  69. import org.apache.zookeeper.proto.SetDataRequest;
  70. import org.apache.zookeeper.proto.SetSASLResponse;
  71. import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
  72. import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
  73. import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
  74. import org.apache.zookeeper.server.SessionTracker.Session;
  75. import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
  76. import org.apache.zookeeper.server.auth.ProviderRegistry;
  77. import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
  78. import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  79. import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
  80. import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
  81. import org.apache.zookeeper.server.util.JvmPauseMonitor;
  82. import org.apache.zookeeper.server.util.OSMXBean;
  83. import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
  84. import org.apache.zookeeper.txn.CreateSessionTxn;
  85. import org.apache.zookeeper.txn.TxnDigest;
  86. import org.apache.zookeeper.txn.TxnHeader;
  87. import org.apache.zookeeper.util.ServiceUtils;
  88. import org.slf4j.Logger;
  89. import org.slf4j.LoggerFactory;
  90. /**
  91. * This class implements a simple standalone ZooKeeperServer. It sets up the
  92. * following chain of RequestProcessors to process requests:
  93. * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
  94. */
  95. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  96. protected static final Logger LOG;
  97. private static final RateLogger RATE_LOGGER;
  98. public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
  99. public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
  100. public static final String SKIP_ACL = "zookeeper.skipACL";
  101. public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
  102. // When enabled, will check ACL constraints appertained to the requests first,
  103. // before sending the requests to the quorum.
  104. static final boolean enableEagerACLCheck;
  105. static final boolean skipACL;
  106. public static final boolean enforceQuota;
  107. public static final String SASL_SUPER_USER = "zookeeper.superUser";
  108. public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
  109. public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
  110. private static boolean digestEnabled;
  111. // Add a enable/disable option for now, we should remove this one when
  112. // this feature is confirmed to be stable
  113. public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
  114. private static boolean closeSessionTxnEnabled = true;
  115. static {
  116. LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
  117. RATE_LOGGER = new RateLogger(LOG);
  118. ZookeeperBanner.printBanner(LOG);
  119. Environment.logEnv("Server environment:", LOG);
  120. enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
  121. LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck);
  122. skipACL = System.getProperty(SKIP_ACL, "no").equals("yes");
  123. if (skipACL) {
  124. LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL);
  125. }
  126. enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false"));
  127. if (enforceQuota) {
  128. LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
  129. }
  130. digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
  131. LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
  132. closeSessionTxnEnabled = Boolean.parseBoolean(
  133. System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
  134. LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
  135. }
  136. public static boolean isCloseSessionTxnEnabled() {
  137. return closeSessionTxnEnabled;
  138. }
  139. public static void setCloseSessionTxnEnabled(boolean enabled) {
  140. ZooKeeperServer.closeSessionTxnEnabled = enabled;
  141. LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
  142. ZooKeeperServer.closeSessionTxnEnabled);
  143. }
  144. protected ZooKeeperServerBean jmxServerBean;
  145. protected DataTreeBean jmxDataTreeBean;
  146. public static final int DEFAULT_TICK_TIME = 3000;
  147. protected int tickTime = DEFAULT_TICK_TIME;
  148. public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
  149. protected static volatile int throttledOpWaitTime =
  150. Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
  151. /** value of -1 indicates unset, use default */
  152. protected int minSessionTimeout = -1;
  153. /** value of -1 indicates unset, use default */
  154. protected int maxSessionTimeout = -1;
  155. /** Socket listen backlog. Value of -1 indicates unset */
  156. protected int listenBacklog = -1;
  157. protected SessionTracker sessionTracker;
  158. private FileTxnSnapLog txnLogFactory = null;
  159. private ZKDatabase zkDb;
  160. private ResponseCache readResponseCache;
  161. private ResponseCache getChildrenResponseCache;
  162. private final AtomicLong hzxid = new AtomicLong(0);
  163. public static final Exception ok = new Exception("No prob");
  164. protected RequestProcessor firstProcessor;
  165. protected JvmPauseMonitor jvmPauseMonitor;
  166. protected volatile State state = State.INITIAL;
  167. private boolean isResponseCachingEnabled = true;
  168. /* contains the configuration file content read at startup */
  169. protected String initialConfig;
  170. protected boolean reconfigEnabled;
  171. private final RequestPathMetricsCollector requestPathMetricsCollector;
  172. private boolean localSessionEnabled = false;
  173. protected enum State {
  174. INITIAL,
  175. RUNNING,
  176. SHUTDOWN,
  177. ERROR
  178. }
  179. /**
  180. * This is the secret that we use to generate passwords. For the moment,
  181. * it's more of a checksum that's used in reconnection, which carries no
  182. * security weight, and is treated internally as if it carries no
  183. * security weight.
  184. */
  185. private static final long superSecret = 0XB3415C00L;
  186. private final AtomicInteger requestsInProcess = new AtomicInteger(0);
  187. final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
  188. // this data structure must be accessed under the outstandingChanges lock
  189. final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();
  190. protected ServerCnxnFactory serverCnxnFactory;
  191. protected ServerCnxnFactory secureServerCnxnFactory;
  192. private final ServerStats serverStats;
  193. private final ZooKeeperServerListener listener;
  194. private ZooKeeperServerShutdownHandler zkShutdownHandler;
  195. private volatile int createSessionTrackerServerId = 1;
  196. private static final String FLUSH_DELAY = "zookeeper.flushDelay";
  197. private static volatile long flushDelay;
  198. private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
  199. private static volatile long maxWriteQueuePollTime;
  200. private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
  201. private static volatile int maxBatchSize;
  202. /**
  203. * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
  204. * Flag not used for small transfers like connectResponses.
  205. */
  206. public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
  207. public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
  208. public static final int intBufferStartingSizeBytes;
  209. public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
  210. public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";
  211. static {
  212. long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
  213. setFlushDelay(configuredFlushDelay);
  214. setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
  215. setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));
  216. intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE);
  217. if (intBufferStartingSizeBytes < 32) {
  218. String msg = "Buffer starting size must be greater than or equal to 32."
  219. + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" ";
  220. LOG.error(msg);
  221. throw new IllegalArgumentException(msg);
  222. }
  223. LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes);
  224. }
  225. // Connection throttling
  226. private BlueThrottle connThrottle = new BlueThrottle();
  227. @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification =
  228. "Internally the throttler has a BlockingQueue so "
  229. + "once the throttler is created and started, it is thread-safe")
  230. private RequestThrottler requestThrottler;
  231. public static final String SNAP_COUNT = "zookeeper.snapCount";
  232. /**
  233. * This setting sets a limit on the total number of large requests that
  234. * can be inflight and is designed to prevent ZooKeeper from accepting
  235. * too many large requests such that the JVM runs out of usable heap and
  236. * ultimately crashes.
  237. *
  238. * The limit is enforced by the {@link checkRequestSize(int, boolean)}
  239. * method which is called by the connection layer ({@link NIOServerCnxn},
  240. * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
  241. * data off the TCP socket. The limit is then checked again by the
  242. * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
  243. * also atomically updates {@link currentLargeRequestBytes}. The request is
  244. * then marked as a large request, with the request size stored in the Request
  245. * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
  246. *
  247. * When a request is completed or dropped, the relevant code path calls the
  248. * {@link requestFinished(Request)} method which performs the decrement if
  249. * needed.
  250. */
  251. private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
  252. /**
  253. * The size threshold after which a request is considered a large request
  254. * and is checked against the large request byte limit.
  255. */
  256. private volatile int largeRequestThreshold = -1;
  257. private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
  258. private AuthenticationHelper authHelper;
  259. void removeCnxn(ServerCnxn cnxn) {
  260. zkDb.removeCnxn(cnxn);
  261. }
  262. /**
  263. * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
  264. * methods to prepare the instance (eg datadir, datalogdir, ticktime,
  265. * builder, etc...)
  266. *
  267. */
  268. public ZooKeeperServer() {
  269. listener = new ZooKeeperServerListenerImpl(this);
  270. serverStats = new ServerStats(this);
  271. this.requestPathMetricsCollector = new RequestPathMetricsCollector();
  272. this.authHelper = new AuthenticationHelper();
  273. }
  274. /**
  275. * Keeping this constructor for backward compatibility
  276. */
  277. public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
  278. this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
  279. }
  280. /**
  281. * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
  282. * actually start listening for clients until run() is invoked.
  283. *
  284. */
  285. public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
  286. serverStats = new ServerStats(this);
  287. this.txnLogFactory = txnLogFactory;
  288. this.txnLogFactory.setServerStats(this.serverStats);
  289. this.zkDb = zkDb;
  290. this.tickTime = tickTime;
  291. setMinSessionTimeout(minSessionTimeout);
  292. setMaxSessionTimeout(maxSessionTimeout);
  293. this.listenBacklog = clientPortListenBacklog;
  294. this.reconfigEnabled = reconfigEnabled;
  295. listener = new ZooKeeperServerListenerImpl(this);
  296. readResponseCache = new ResponseCache(Integer.getInteger(
  297. GET_DATA_RESPONSE_CACHE_SIZE,
  298. ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData");
  299. getChildrenResponseCache = new ResponseCache(Integer.getInteger(
  300. GET_CHILDREN_RESPONSE_CACHE_SIZE,
  301. ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren");
  302. this.initialConfig = initialConfig;
  303. this.requestPathMetricsCollector = new RequestPathMetricsCollector();
  304. this.initLargeRequestThrottlingSettings();
  305. this.authHelper = new AuthenticationHelper();
  306. LOG.info(
  307. "Created server with"
  308. + " tickTime {} ms"
  309. + " minSessionTimeout {} ms"
  310. + " maxSessionTimeout {} ms"
  311. + " clientPortListenBacklog {}"
  312. + " datadir {}"
  313. + " snapdir {}",
  314. tickTime,
  315. getMinSessionTimeout(),
  316. getMaxSessionTimeout(),
  317. getClientPortListenBacklog(),
  318. txnLogFactory.getDataDir(),
  319. txnLogFactory.getSnapDir());
  320. }
  321. public String getInitialConfig() {
  322. return initialConfig;
  323. }
  324. /**
  325. * Adds JvmPauseMonitor and calls
  326. * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)}
  327. *
  328. */
  329. public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
  330. this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
  331. this.jvmPauseMonitor = jvmPauseMonitor;
  332. if (jvmPauseMonitor != null) {
  333. LOG.info("Added JvmPauseMonitor to server");
  334. }
  335. }
  336. /**
  337. * creates a zookeeperserver instance.
  338. * @param txnLogFactory the file transaction snapshot logging class
  339. * @param tickTime the ticktime for the server
  340. * @throws IOException
  341. */
  342. public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) {
  343. this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
  344. }
  345. public ServerStats serverStats() {
  346. return serverStats;
  347. }
  348. public RequestPathMetricsCollector getRequestPathMetricsCollector() {
  349. return requestPathMetricsCollector;
  350. }
  351. public BlueThrottle connThrottle() {
  352. return connThrottle;
  353. }
  354. public void dumpConf(PrintWriter pwriter) {
  355. pwriter.print("clientPort=");
  356. pwriter.println(getClientPort());
  357. pwriter.print("secureClientPort=");
  358. pwriter.println(getSecureClientPort());
  359. pwriter.print("dataDir=");
  360. pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
  361. pwriter.print("dataDirSize=");
  362. pwriter.println(getDataDirSize());
  363. pwriter.print("dataLogDir=");
  364. pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath());
  365. pwriter.print("dataLogSize=");
  366. pwriter.println(getLogDirSize());
  367. pwriter.print("tickTime=");
  368. pwriter.println(getTickTime());
  369. pwriter.print("maxClientCnxns=");
  370. pwriter.println(getMaxClientCnxnsPerHost());
  371. pwriter.print("minSessionTimeout=");
  372. pwriter.println(getMinSessionTimeout());
  373. pwriter.print("maxSessionTimeout=");
  374. pwriter.println(getMaxSessionTimeout());
  375. pwriter.print("clientPortListenBacklog=");
  376. pwriter.println(getClientPortListenBacklog());
  377. pwriter.print("serverId=");
  378. pwriter.println(getServerId());
  379. }
  380. public ZooKeeperServerConf getConf() {
  381. return new ZooKeeperServerConf(
  382. getClientPort(),
  383. zkDb.snapLog.getSnapDir().getAbsolutePath(),
  384. zkDb.snapLog.getDataDir().getAbsolutePath(),
  385. getTickTime(),
  386. getMaxClientCnxnsPerHost(),
  387. getMinSessionTimeout(),
  388. getMaxSessionTimeout(),
  389. getServerId(),
  390. getClientPortListenBacklog());
  391. }
  392. /**
  393. * This constructor is for backward compatibility with the existing unit
  394. * test code.
  395. * It defaults to FileLogProvider persistence provider.
  396. */
  397. public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
  398. this(new FileTxnSnapLog(snapDir, logDir), tickTime, "");
  399. }
  400. /**
  401. * Default constructor, relies on the config for its argument values
  402. *
  403. * @throws IOException
  404. */
  405. public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException {
  406. this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled());
  407. }
  408. /**
  409. * get the zookeeper database for this server
  410. * @return the zookeeper database for this server
  411. */
  412. public ZKDatabase getZKDatabase() {
  413. return this.zkDb;
  414. }
  415. /**
  416. * set the zkdatabase for this zookeeper server
  417. * @param zkDb
  418. */
  419. public void setZKDatabase(ZKDatabase zkDb) {
  420. this.zkDb = zkDb;
  421. }
  422. /**
  423. * Restore sessions and data
  424. */
  425. public void loadData() throws IOException, InterruptedException {
  426. /*
  427. * When a new leader starts executing Leader#lead, it
  428. * invokes this method. The database, however, has been
  429. * initialized before running leader election so that
  430. * the server could pick its zxid for its initial vote.
  431. * It does it by invoking QuorumPeer#getLastLoggedZxid.
  432. * Consequently, we don't need to initialize it once more
  433. * and avoid the penalty of loading it a second time. Not
  434. * reloading it is particularly important for applications
  435. * that host a large database.
  436. *
  437. * The following if block checks whether the database has
  438. * been initialized or not. Note that this method is
  439. * invoked by at least one other method:
  440. * ZooKeeperServer#startdata.
  441. *
  442. * See ZOOKEEPER-1642 for more detail.
  443. */
  444. if (zkDb.isInitialized()) {
  445. setZxid(zkDb.getDataTreeLastProcessedZxid());
  446. } else {
  447. setZxid(zkDb.loadDataBase());
  448. }
  449. // Clean up dead sessions
  450. zkDb.getSessions().stream()
  451. .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
  452. .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
  453. // Make a clean snapshot
  454. takeSnapshot();
  455. }
  456. public void takeSnapshot() {
  457. takeSnapshot(false);
  458. }
  459. public void takeSnapshot(boolean syncSnap) {
  460. long start = Time.currentElapsedTime();
  461. try {
  462. txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
  463. } catch (IOException e) {
  464. LOG.error("Severe unrecoverable error, exiting", e);
  465. // This is a severe error that we cannot recover from,
  466. // so we need to exit
  467. ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
  468. }
  469. long elapsed = Time.currentElapsedTime() - start;
  470. LOG.info("Snapshot taken in {} ms", elapsed);
  471. ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
  472. }
  473. public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
  474. return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
  475. }
  476. @Override
  477. public long getDataDirSize() {
  478. if (zkDb == null) {
  479. return 0L;
  480. }
  481. File path = zkDb.snapLog.getDataDir();
  482. return getDirSize(path);
  483. }
  484. @Override
  485. public long getLogDirSize() {
  486. if (zkDb == null) {
  487. return 0L;
  488. }
  489. File path = zkDb.snapLog.getSnapDir();
  490. return getDirSize(path);
  491. }
  492. private long getDirSize(File file) {
  493. long size = 0L;
  494. if (file.isDirectory()) {
  495. File[] files = file.listFiles();
  496. if (files != null) {
  497. for (File f : files) {
  498. size += getDirSize(f);
  499. }
  500. }
  501. } else {
  502. size = file.length();
  503. }
  504. return size;
  505. }
  506. public long getZxid() {
  507. return hzxid.get();
  508. }
  509. public SessionTracker getSessionTracker() {
  510. return sessionTracker;
  511. }
  512. long getNextZxid() {
  513. return hzxid.incrementAndGet();
  514. }
  515. public void setZxid(long zxid) {
  516. hzxid.set(zxid);
  517. }
  518. private void close(long sessionId) {
  519. Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
  520. submitRequest(si);
  521. }
  522. public void closeSession(long sessionId) {
  523. LOG.info("Closing session 0x{}", Long.toHexString(sessionId));
  524. // we do not want to wait for a session close. send it as soon as we
  525. // detect it!
  526. close(sessionId);
  527. }
  528. protected void killSession(long sessionId, long zxid) {
  529. zkDb.killSession(sessionId, zxid);
  530. if (LOG.isTraceEnabled()) {
  531. ZooTrace.logTraceMessage(
  532. LOG,
  533. ZooTrace.SESSION_TRACE_MASK,
  534. "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId));
  535. }
  536. if (sessionTracker != null) {
  537. sessionTracker.removeSession(sessionId);
  538. }
  539. }
  540. public void expire(Session session) {
  541. long sessionId = session.getSessionId();
  542. LOG.info(
  543. "Expiring session 0x{}, timeout of {}ms exceeded",
  544. Long.toHexString(sessionId),
  545. session.getTimeout());
  546. close(sessionId);
  547. }
  548. public void expire(long sessionId) {
  549. LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId));
  550. close(sessionId);
  551. }
  552. public static class MissingSessionException extends IOException {
  553. private static final long serialVersionUID = 7467414635467261007L;
  554. public MissingSessionException(String msg) {
  555. super(msg);
  556. }
  557. }
  558. void touch(ServerCnxn cnxn) throws MissingSessionException {
  559. if (cnxn == null) {
  560. return;
  561. }
  562. long id = cnxn.getSessionId();
  563. int to = cnxn.getSessionTimeout();
  564. if (!sessionTracker.touchSession(id, to)) {
  565. throw new MissingSessionException("No session with sessionid 0x"
  566. + Long.toHexString(id)
  567. + " exists, probably expired and removed");
  568. }
  569. }
  570. protected void registerJMX() {
  571. // register with JMX
  572. try {
  573. jmxServerBean = new ZooKeeperServerBean(this);
  574. MBeanRegistry.getInstance().register(jmxServerBean, null);
  575. try {
  576. jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
  577. MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
  578. } catch (Exception e) {
  579. LOG.warn("Failed to register with JMX", e);
  580. jmxDataTreeBean = null;
  581. }
  582. } catch (Exception e) {
  583. LOG.warn("Failed to register with JMX", e);
  584. jmxServerBean = null;
  585. }
  586. }
  587. public void startdata() throws IOException, InterruptedException {
  588. //check to see if zkDb is not null
  589. if (zkDb == null) {
  590. zkDb = new ZKDatabase(this.txnLogFactory);
  591. }
  592. if (!zkDb.isInitialized()) {
  593. loadData();
  594. }
  595. }
  596. public synchronized void startup() {
  597. startupWithServerState(State.RUNNING);
  598. }
  599. public synchronized void startupWithoutServing() {
  600. startupWithServerState(State.INITIAL);
  601. }
  602. public synchronized void startServing() {
  603. setState(State.RUNNING);
  604. notifyAll();
  605. }
  606. private void startupWithServerState(State state) {
  607. if (sessionTracker == null) {
  608. createSessionTracker();
  609. }
  610. startSessionTracker();
  611. setupRequestProcessors();
  612. startRequestThrottler();
  613. registerJMX();
  614. startJvmPauseMonitor();
  615. registerMetrics();
  616. setState(state);
  617. requestPathMetricsCollector.start();
  618. localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
  619. notifyAll();
  620. }
  621. protected void startJvmPauseMonitor() {
  622. if (this.jvmPauseMonitor != null) {
  623. this.jvmPauseMonitor.serviceStart();
  624. }
  625. }
  626. protected void startRequestThrottler() {
  627. requestThrottler = new RequestThrottler(this);
  628. requestThrottler.start();
  629. }
  630. protected void setupRequestProcessors() {
  631. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  632. RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
  633. ((SyncRequestProcessor) syncProcessor).start();
  634. firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  635. ((PrepRequestProcessor) firstProcessor).start();
  636. }
  637. public ZooKeeperServerListener getZooKeeperServerListener() {
  638. return listener;
  639. }
  640. /**
  641. * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to
  642. * {@link #startup()} being called
  643. *
  644. * @param newId ID to use
  645. */
  646. public void setCreateSessionTrackerServerId(int newId) {
  647. createSessionTrackerServerId = newId;
  648. }
  649. protected void createSessionTracker() {
  650. sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
  651. }
  652. protected void startSessionTracker() {
  653. ((SessionTrackerImpl) sessionTracker).start();
  654. }
  655. /**
  656. * Sets the state of ZooKeeper server. After changing the state, it notifies
  657. * the server state change to a registered shutdown handler, if any.
  658. * <p>
  659. * The following are the server state transitions:
  660. * <ul><li>During startup the server will be in the INITIAL state.</li>
  661. * <li>After successfully starting, the server sets the state to RUNNING.
  662. * </li>
  663. * <li>The server transitions to the ERROR state if it hits an internal
  664. * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource
  665. * error events, e.g., SyncRequestProcessor not being able to write a txn to
  666. * disk.</li>
  667. * <li>During shutdown the server sets the state to SHUTDOWN, which
  668. * corresponds to the server not running.</li></ul>
  669. *
  670. * @param state new server state.
  671. */
  672. protected void setState(State state) {
  673. this.state = state;
  674. // Notify server state changes to the registered shutdown handler, if any.
  675. if (zkShutdownHandler != null) {
  676. zkShutdownHandler.handle(state);
  677. } else {
  678. LOG.debug(
  679. "ZKShutdownHandler is not registered, so ZooKeeper server"
  680. + " won't take any action on ERROR or SHUTDOWN server state changes");
  681. }
  682. }
  683. /**
  684. * This can be used while shutting down the server to see whether the server
  685. * is already shutdown or not.
  686. *
  687. * @return true if the server is running or server hits an error, false
  688. * otherwise.
  689. */
  690. protected boolean canShutdown() {
  691. return state == State.RUNNING || state == State.ERROR;
  692. }
  693. /**
  694. * @return true if the server is running, false otherwise.
  695. */
  696. public boolean isRunning() {
  697. return state == State.RUNNING;
  698. }
  699. public void shutdown() {
  700. shutdown(false);
  701. }
  702. /**
  703. * Shut down the server instance
  704. * @param fullyShutDown true if another server using the same database will not replace this one in the same process
  705. */
  706. public synchronized void shutdown(boolean fullyShutDown) {
  707. if (!canShutdown()) {
  708. if (fullyShutDown && zkDb != null) {
  709. zkDb.clear();
  710. }
  711. LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
  712. return;
  713. }
  714. LOG.info("shutting down");
  715. // new RuntimeException("Calling shutdown").printStackTrace();
  716. setState(State.SHUTDOWN);
  717. // unregister all metrics that are keeping a strong reference to this object
  718. // subclasses will do their specific clean up
  719. unregisterMetrics();
  720. if (requestThrottler != null) {
  721. requestThrottler.shutdown();
  722. }
  723. // Since sessionTracker and syncThreads poll we just have to
  724. // set running to false and they will detect it during the poll
  725. // interval.
  726. if (sessionTracker != null) {
  727. sessionTracker.shutdown();
  728. }
  729. if (firstProcessor != null) {
  730. firstProcessor.shutdown();
  731. }
  732. if (jvmPauseMonitor != null) {
  733. jvmPauseMonitor.serviceStop();
  734. }
  735. if (zkDb != null) {
  736. if (fullyShutDown) {
  737. zkDb.clear();
  738. } else {
  739. // else there is no need to clear the database
  740. // * When a new quorum is established we can still apply the diff
  741. // on top of the same zkDb data
  742. // * If we fetch a new snapshot from leader, the zkDb will be
  743. // cleared anyway before loading the snapshot
  744. try {
  745. //This will fast forward the database to the latest recorded transactions
  746. zkDb.fastForwardDataBase();
  747. } catch (IOException e) {
  748. LOG.error("Error updating DB", e);
  749. zkDb.clear();
  750. }
  751. }
  752. }
  753. requestPathMetricsCollector.shutdown();
  754. unregisterJMX();
  755. }
  756. protected void unregisterJMX() {
  757. // unregister from JMX
  758. try {
  759. if (jmxDataTreeBean != null) {
  760. MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
  761. }
  762. } catch (Exception e) {
  763. LOG.warn("Failed to unregister with JMX", e);
  764. }
  765. try {
  766. if (jmxServerBean != null) {
  767. MBeanRegistry.getInstance().unregister(jmxServerBean);
  768. }
  769. } catch (Exception e) {
  770. LOG.warn("Failed to unregister with JMX", e);
  771. }
  772. jmxServerBean = null;
  773. jmxDataTreeBean = null;
  774. }
  775. public void incInProcess() {
  776. requestsInProcess.incrementAndGet();
  777. }
  778. public void decInProcess() {
  779. requestsInProcess.decrementAndGet();
  780. if (requestThrottler != null) {
  781. requestThrottler.throttleWake();
  782. }
  783. }
  784. public int getInProcess() {
  785. return requestsInProcess.get();
  786. }
  787. public int getInflight() {
  788. return requestThrottleInflight();
  789. }
  790. private int requestThrottleInflight() {
  791. if (requestThrottler != null) {
  792. return requestThrottler.getInflight();
  793. }
  794. return 0;
  795. }
  796. static class PrecalculatedDigest {
  797. final long nodeDigest;
  798. final long treeDigest;
  799. PrecalculatedDigest(long nodeDigest, long treeDigest) {
  800. this.nodeDigest = nodeDigest;
  801. this.treeDigest = treeDigest;
  802. }
  803. }
  804. /**
  805. * This structure is used to facilitate information sharing between PrepRP
  806. * and FinalRP.
  807. */
  808. static class ChangeRecord {
  809. PrecalculatedDigest precalculatedDigest;
  810. byte[] data;
  811. ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
  812. this.zxid = zxid;
  813. this.path = path;
  814. this.stat = stat;
  815. this.childCount = childCount;
  816. this.acl = acl;
  817. }
  818. long zxid;
  819. String path;
  820. StatPersisted stat; /* Make sure to create a new object when changing */
  821. int childCount;
  822. List<ACL> acl; /* Make sure to create a new object when changing */
  823. ChangeRecord duplicate(long zxid) {
  824. StatPersisted stat = new StatPersisted();
  825. if (this.stat != null) {
  826. DataTree.copyStatPersisted(this.stat, stat);
  827. }
  828. ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount,
  829. acl == null ? new ArrayList<>() : new ArrayList<>(acl));
  830. changeRecord.precalculatedDigest = precalculatedDigest;
  831. changeRecord.data = data;
  832. return changeRecord;
  833. }
  834. }
  835. byte[] generatePasswd(long id) {
  836. Random r = new Random(id ^ superSecret);
  837. byte[] p = new byte[16];
  838. r.nextBytes(p);
  839. return p;
  840. }
  841. protected boolean checkPasswd(long sessionId, byte[] passwd) {
  842. return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId));
  843. }
  844. long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
  845. if (passwd == null) {
  846. // Possible since it's just deserialized from a packet on the wire.
  847. passwd = new byte[0];
  848. }
  849. long sessionId = sessionTracker.createSession(timeout);
  850. Random r = new Random(sessionId ^ superSecret);
  851. r.nextBytes(passwd);
  852. ByteBuffer to = ByteBuffer.allocate(4);
  853. to.putInt(timeout);
  854. cnxn.setSessionId(sessionId);
  855. Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
  856. submitRequest(si);
  857. return sessionId;
  858. }
  859. /**
  860. * set the owner of this session as owner
  861. * @param id the session id
  862. * @param owner the owner of the session
  863. * @throws SessionExpiredException
  864. */
  865. public void setOwner(long id, Object owner) throws SessionExpiredException {
  866. sessionTracker.setOwner(id, owner);
  867. }
  868. protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
  869. boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
  870. if (LOG.isTraceEnabled()) {
  871. ZooTrace.logTraceMessage(
  872. LOG,
  873. ZooTrace.SESSION_TRACE_MASK,
  874. "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc);
  875. }
  876. finishSessionInit(cnxn, rc);
  877. }
  878. public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
  879. if (checkPasswd(sessionId, passwd)) {
  880. revalidateSession(cnxn, sessionId, sessionTimeout);
  881. } else {
  882. LOG.warn(
  883. "Incorrect password from {} for session 0x{}",
  884. cnxn.getRemoteSocketAddress(),
  885. Long.toHexString(sessionId));
  886. finishSessionInit(cnxn, false);
  887. }
  888. }
  889. public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
  890. // register with JMX
  891. try {
  892. if (valid) {
  893. if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
  894. serverCnxnFactory.registerConnection(cnxn);
  895. } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
  896. secureServerCnxnFactory.registerConnection(cnxn);
  897. }
  898. }
  899. } catch (Exception e) {
  900. LOG.warn("Failed to register with JMX", e);
  901. }
  902. try {
  903. ConnectResponse rsp = new ConnectResponse(
  904. 0,
  905. valid ? cnxn.getSessionTimeout() : 0,
  906. valid ? cnxn.getSessionId() : 0, // send 0 if session is no
  907. // longer valid
  908. valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
  909. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  910. BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
  911. bos.writeInt(-1, "len");
  912. rsp.serialize(bos, "connect");
  913. if (!cnxn.isOldClient) {
  914. bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
  915. }
  916. baos.close();
  917. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  918. bb.putInt(bb.remaining() - 4).rewind();
  919. cnxn.sendBuffer(bb);
  920. if (valid) {
  921. LOG.debug(
  922. "Established session 0x{} with negotiated timeout {} for client {}",
  923. Long.toHexString(cnxn.getSessionId()),
  924. cnxn.getSessionTimeout(),
  925. cnxn.getRemoteSocketAddress());
  926. cnxn.enableRecv();
  927. } else {
  928. LOG.info(
  929. "Invalid session 0x{} for client {}, probably expired",
  930. Long.toHexString(cnxn.getSessionId()),
  931. cnxn.getRemoteSocketAddress());
  932. cnxn.sendBuffer(ServerCnxnFactory.closeConn);
  933. }
  934. } catch (Exception e) {
  935. LOG.warn("Exception while establishing session, closing", e);
  936. cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
  937. }
  938. }
  939. public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
  940. closeSession(cnxn.getSessionId());
  941. }
  942. public long getServerId() {
  943. return 0;
  944. }
  945. /**
  946. * If the underlying Zookeeper server support local session, this method
  947. * will set a isLocalSession to true if a request is associated with
  948. * a local session.
  949. *
  950. * @param si
  951. */
  952. protected void setLocalSessionFlag(Request si) {
  953. }
  954. public void submitRequest(Request si) {
  955. enqueueRequest(si);
  956. }
  957. public void enqueueRequest(Request si) {
  958. if (requestThrottler == null) {
  959. synchronized (this) {
  960. try {
  961. // Since all requests are passed to the request
  962. // processor it should wait for setting up the request
  963. // processor chain. The state will be updated to RUNNING
  964. // after the setup.
  965. while (state == State.INITIAL) {
  966. wait(1000);
  967. }
  968. } catch (InterruptedException e) {
  969. LOG.warn("Unexpected interruption", e);
  970. }
  971. if (requestThrottler == null) {
  972. throw new RuntimeException("Not started");
  973. }
  974. }
  975. }
  976. requestThrottler.submitRequest(si);
  977. }
  978. public void submitRequestNow(Request si) {
  979. if (firstProcessor == null) {
  980. synchronized (this) {
  981. try {
  982. // Since all requests are passed to the request
  983. // processor it should wait for setting up the request
  984. // processor chain. The state will be updated to RUNNING
  985. // after the setup.
  986. while (state == State.INITIAL) {
  987. wait(1000);
  988. }
  989. } catch (InterruptedException e) {
  990. LOG.warn("Unexpected interruption", e);
  991. }
  992. if (firstProcessor == null || state != State.RUNNING) {
  993. throw new RuntimeException("Not started");
  994. }
  995. }
  996. }
  997. try {
  998. touch(si.cnxn);
  999. boolean validpacket = Request.isValid(si.type);
  1000. if (validpacket) {
  1001. setLocalSessionFlag(si);
  1002. firstProcessor.processRequest(si);
  1003. if (si.cnxn != null) {
  1004. incInProcess();
  1005. }
  1006. } else {
  1007. LOG.warn("Received packet at server of unknown type {}", si.type);
  1008. // Update request accounting/throttling limits
  1009. requestFinished(si);
  1010. new UnimplementedRequestProcessor().processRequest(si);
  1011. }
  1012. } catch (MissingSessionException e) {
  1013. LOG.debug("Dropping request.", e);
  1014. // Update request accounting/throttling limits
  1015. requestFinished(si);
  1016. } catch (RequestProcessorException e) {
  1017. LOG.error("Unable to process request", e);
  1018. // Update request accounting/throttling limits
  1019. requestFinished(si);
  1020. }
  1021. }
  1022. public static int getSnapCount() {
  1023. String sc = System.getProperty(SNAP_COUNT);
  1024. try {
  1025. int snapCount = Integer.parseInt(sc);
  1026. // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor
  1027. if (snapCount < 2) {
  1028. LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2");
  1029. snapCount = 2;
  1030. }
  1031. return snapCount;
  1032. } catch (Exception e) {
  1033. return 100000;
  1034. }
  1035. }
  1036. public int getGlobalOutstandingLimit() {
  1037. String sc = System.getProperty(GLOBAL_OUTSTANDING_LIMIT);
  1038. int limit;
  1039. try {
  1040. limit = Integer.parseInt(sc);
  1041. } catch (Exception e) {
  1042. limit = 1000;
  1043. }
  1044. return limit;
  1045. }
  1046. public static long getSnapSizeInBytes() {
  1047. long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default
  1048. if (size <= 0) {
  1049. LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size);
  1050. }
  1051. return size * 1024; // Convert to bytes
  1052. }
  1053. public void setServerCnxnFactory(ServerCnxnFactory factory) {
  1054. serverCnxnFactory = factory;
  1055. }
  1056. public ServerCnxnFactory getServerCnxnFactory() {
  1057. return serverCnxnFactory;
  1058. }
  1059. public ServerCnxnFactory getSecureServerCnxnFactory() {
  1060. return secureServerCnxnFactory;
  1061. }
  1062. public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {
  1063. secureServerCnxnFactory = factory;
  1064. }
  1065. /**
  1066. * return the last processed id from the
  1067. * datatree
  1068. */
  1069. public long getLastProcessedZxid() {
  1070. return zkDb.getDataTreeLastProcessedZxid();
  1071. }
  1072. /**
  1073. * return the outstanding requests
  1074. * in the queue, which haven't been
  1075. * processed yet
  1076. */
  1077. public long getOutstandingRequests() {
  1078. return getInProcess();
  1079. }
  1080. /**
  1081. * return the total number of client connections that are alive
  1082. * to this server
  1083. */
  1084. public int getNumAliveConnections() {
  1085. int numAliveConnections = 0;
  1086. if (serverCnxnFactory != null) {
  1087. numAliveConnections += serverCnxnFactory.getNumAliveConnections();
  1088. }
  1089. if (secureServerCnxnFactory != null) {
  1090. numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
  1091. }
  1092. return numAliveConnections;
  1093. }
  1094. /**
  1095. * truncate the log to get in sync with others
  1096. * if in a quorum
  1097. * @param zxid the zxid that it needs to get in sync
  1098. * with others
  1099. * @throws IOException
  1100. */
  1101. public void truncateLog(long zxid) throws IOException {
  1102. this.zkDb.truncateLog(zxid);
  1103. }
  1104. public int getTickTime() {
  1105. return tickTime;
  1106. }
  1107. public void setTickTime(int tickTime) {
  1108. LOG.info("tickTime set to {} ms", tickTime);
  1109. this.tickTime = tickTime;
  1110. }
  1111. public static int getThrottledOpWaitTime() {
  1112. return throttledOpWaitTime;
  1113. }
  1114. public static void setThrottledOpWaitTime(int time) {
  1115. LOG.info("throttledOpWaitTime set to {} ms", time);
  1116. throttledOpWaitTime = time;
  1117. }
  1118. public int getMinSessionTimeout() {
  1119. return minSessionTimeout;
  1120. }
  1121. public void setMinSessionTimeout(int min) {
  1122. this.minSessionTimeout = min == -1 ? tickTime * 2 : min;
  1123. LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout);
  1124. }
  1125. public int getMaxSessionTimeout() {
  1126. return maxSessionTimeout;
  1127. }
  1128. public void setMaxSessionTimeout(int max) {
  1129. this.maxSessionTimeout = max == -1 ? tickTime * 20 : max;
  1130. LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout);
  1131. }
  1132. public int getClientPortListenBacklog() {
  1133. return listenBacklog;
  1134. }
  1135. public void setClientPortListenBacklog(int backlog) {
  1136. this.listenBacklog = backlog;
  1137. LOG.info("clientPortListenBacklog set to {}", backlog);
  1138. }
  1139. public int getClientPort() {
  1140. return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
  1141. }
  1142. public int getSecureClientPort() {
  1143. return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1;
  1144. }
  1145. /** Maximum number of connections allowed from particular host (ip) */
  1146. public int getMaxClientCnxnsPerHost() {
  1147. if (serverCnxnFactory != null) {
  1148. return serverCnxnFactory.getMaxClientCnxnsPerHost();
  1149. }
  1150. if (secureServerCnxnFactory != null) {
  1151. return secureServerCnxnFactory.getMaxClientCnxnsPerHost();
  1152. }
  1153. return -1;
  1154. }
  1155. public void setTxnLogFactory(FileTxnSnapLog txnLog) {
  1156. this.txnLogFactory = txnLog;
  1157. }
  1158. public FileTxnSnapLog getTxnLogFactory() {
  1159. return this.txnLogFactory;
  1160. }
  1161. /**
  1162. * Returns the elapsed sync of time of transaction log in milliseconds.
  1163. */
  1164. public long getTxnLogElapsedSyncTime() {
  1165. return txnLogFactory.getTxnLogElapsedSyncTime();
  1166. }
  1167. public String getState() {
  1168. return "standalone";
  1169. }
  1170. public void dumpEphemerals(PrintWriter pwriter) {
  1171. zkDb.dumpEphemerals(pwriter);
  1172. }
  1173. public Map<Long, Set<String>> getEphemerals() {
  1174. return zkDb.getEphemerals();
  1175. }
  1176. public double getConnectionDropChance() {
  1177. return connThrottle.getDropChance();
  1178. }
  1179. @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
  1180. public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
  1181. throws IOException, ClientCnxnLimitException {
  1182. BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
  1183. ConnectRequest connReq = new ConnectRequest();
  1184. connReq.deserialize(bia, "connect");
  1185. LOG.debug(
  1186. "Session establishment request from client {} client's lastZxid is 0x{}",
  1187. cnxn.getRemoteSocketAddress(),
  1188. Long.toHexString(connReq.getLastZxidSeen()));
  1189. long sessionId = connReq.getSessionId();
  1190. int tokensNeeded = 1;
  1191. if (connThrottle.isConnectionWeightEnabled()) {
  1192. if (sessionId == 0) {
  1193. if (localSessionEnabled) {
  1194. tokensNeeded = connThrottle.getRequiredTokensForLocal();
  1195. } else {
  1196. tokensNeeded = connThrottle.getRequiredTokensForGlobal();
  1197. }
  1198. } else {
  1199. tokensNeeded = connThrottle.getRequiredTokensForRenew();
  1200. }
  1201. }
  1202. if (!connThrottle.checkLimit(tokensNeeded)) {
  1203. throw new ClientCnxnLimitException();
  1204. }
  1205. ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
  1206. ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
  1207. boolean readOnly = false;
  1208. try {
  1209. readOnly = bia.readBool("readOnly");
  1210. cnxn.isOldClient = false;
  1211. } catch (IOException e) {
  1212. // this is ok -- just a packet from an old client which
  1213. // doesn't contain readOnly field
  1214. LOG.warn(
  1215. "Connection request from old client {}; will be dropped if server is in r-o mode",
  1216. cnxn.getRemoteSocketAddress());
  1217. }
  1218. if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
  1219. String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
  1220. LOG.info(msg);
  1221. throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
  1222. }
  1223. if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
  1224. String msg = "Refusing session request for client "
  1225. + cnxn.getRemoteSocketAddress()
  1226. + " as it has seen zxid 0x"
  1227. + Long.toHexString(connReq.getLastZxidSeen())
  1228. + " our last zxid is 0x"
  1229. + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
  1230. + " client must try another server";
  1231. LOG.info(msg);
  1232. throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
  1233. }
  1234. int sessionTimeout = connReq.getTimeOut();
  1235. byte[] passwd = connReq.getPasswd();
  1236. int minSessionTimeout = getMinSessionTimeout();
  1237. if (sessionTimeout < minSessionTimeout) {
  1238. sessionTimeout = minSessionTimeout;
  1239. }
  1240. int maxSessionTimeout = getMaxSessionTimeout();
  1241. if (sessionTimeout > maxSessionTimeout) {
  1242. sessionTimeout = maxSessionTimeout;
  1243. }
  1244. cnxn.setSessionTimeout(sessionTimeout);
  1245. // We don't want to receive any packets until we are sure that the
  1246. // session is setup
  1247. cnxn.disableRecv();
  1248. if (sessionId == 0) {
  1249. long id = createSession(cnxn, passwd, sessionTimeout);
  1250. LOG.debug(
  1251. "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
  1252. Long.toHexString(id),
  1253. Long.toHexString(connReq.getLastZxidSeen()),
  1254. connReq.getTimeOut(),
  1255. cnxn.getRemoteSocketAddress());
  1256. } else {
  1257. validateSession(cnxn, sessionId);
  1258. LOG.debug(
  1259. "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
  1260. Long.toHexString(sessionId),
  1261. Long.toHexString(connReq.getLastZxidSeen()),
  1262. connReq.getTimeOut(),
  1263. cnxn.getRemoteSocketAddress());
  1264. if (serverCnxnFactory != null) {
  1265. serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
  1266. }
  1267. if (secureServerCnxnFactory != null) {
  1268. secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
  1269. }
  1270. cnxn.setSessionId(sessionId);
  1271. reopenSession(cnxn, sessionId, passwd, sessionTimeout);
  1272. ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
  1273. }
  1274. }
  1275. /**
  1276. * Validate if a particular session can be reestablished.
  1277. *
  1278. * @param cnxn
  1279. * @param sessionId
  1280. */
  1281. protected void validateSession(ServerCnxn cnxn, long sessionId)
  1282. throws IOException {
  1283. // do nothing
  1284. }
  1285. public boolean shouldThrottle(long outStandingCount) {
  1286. int globalOutstandingLimit = getGlobalOutstandingLimit();
  1287. if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
  1288. return outStandingCount > 0;
  1289. }
  1290. return false;
  1291. }
  1292. long getFlushDelay() {
  1293. return flushDelay;
  1294. }
  1295. static void setFlushDelay(long delay) {
  1296. LOG.info("{} = {} ms", FLUSH_DELAY, delay);
  1297. flushDelay = delay;
  1298. }
  1299. long getMaxWriteQueuePollTime() {
  1300. return maxWriteQueuePollTime;
  1301. }
  1302. static void setMaxWriteQueuePollTime(long maxTime) {
  1303. LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
  1304. maxWriteQueuePollTime = maxTime;
  1305. }
  1306. int getMaxBatchSize() {
  1307. return maxBatchSize;
  1308. }
  1309. static void setMaxBatchSize(int size) {
  1310. LOG.info("{}={}", MAX_BATCH_SIZE, size);
  1311. maxBatchSize = size;
  1312. }
  1313. private void initLargeRequestThrottlingSettings() {
  1314. setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
  1315. setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
  1316. }
  1317. public int getLargeRequestMaxBytes() {
  1318. return largeRequestMaxBytes;
  1319. }
  1320. public void setLargeRequestMaxBytes(int bytes) {
  1321. if (bytes <= 0) {
  1322. LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
  1323. LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
  1324. } else {
  1325. largeRequestMaxBytes = bytes;
  1326. LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
  1327. }
  1328. }
  1329. public int getLargeRequestThreshold() {
  1330. return largeRequestThreshold;
  1331. }
  1332. public void setLargeRequestThreshold(int threshold) {
  1333. if (threshold == 0 || threshold < -1) {
  1334. LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
  1335. largeRequestThreshold = -1;
  1336. } else {
  1337. largeRequestThreshold = threshold;
  1338. LOG.info("The large request threshold is set to {}", largeRequestThreshold);
  1339. }
  1340. }
  1341. public int getLargeRequestBytes() {
  1342. return currentLargeRequestBytes.get();
  1343. }
  1344. private boolean isLargeRequest(int length) {
  1345. // The large request limit is disabled when threshold is -1
  1346. if (largeRequestThreshold == -1) {
  1347. return false;
  1348. }
  1349. return length > largeRequestThreshold;
  1350. }
  1351. public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
  1352. if (!isLargeRequest(length)) {
  1353. return true;
  1354. }
  1355. if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
  1356. return true;
  1357. } else {
  1358. ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
  1359. throw new IOException("Rejecting large request");
  1360. }
  1361. }
  1362. private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
  1363. if (!isLargeRequest(length)) {
  1364. return true;
  1365. }
  1366. int bytes = currentLargeRequestBytes.addAndGet(length);
  1367. if (bytes > largeRequestMaxBytes) {
  1368. currentLargeRequestBytes.addAndGet(-length);
  1369. ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
  1370. throw new IOException("Rejecting large request");
  1371. }
  1372. return true;
  1373. }
  1374. public void requestFinished(Request request) {
  1375. int largeRequestLength = request.getLargeRequestSize();
  1376. if (largeRequestLength != -1) {
  1377. currentLargeRequestBytes.addAndGet(-largeRequestLength);
  1378. }
  1379. }
  1380. public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
  1381. // We have the request, now process and setup for next
  1382. InputStream bais = new ByteBufferInputStream(incomingBuffer);
  1383. BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
  1384. RequestHeader h = new RequestHeader();
  1385. h.deserialize(bia, "header");
  1386. // Need to increase the outstanding request count first, otherwise
  1387. // there might be a race condition that it enabled recv after
  1388. // processing request and then disabled when check throttling.
  1389. //
  1390. // Be aware that we're actually checking the global outstanding
  1391. // request before this request.
  1392. //
  1393. // It's fine if the IOException thrown before we decrease the count
  1394. // in cnxn, since it will close the cnxn anyway.
  1395. cnxn.incrOutstandingAndCheckThrottle(h);
  1396. // Through the magic of byte buffers, txn will not be
  1397. // pointing
  1398. // to the start of the txn
  1399. incomingBuffer = incomingBuffer.slice();
  1400. if (h.getType() == OpCode.auth) {
  1401. LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
  1402. AuthPacket authPacket = new AuthPacket();
  1403. ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
  1404. String scheme = authPacket.getScheme();
  1405. ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
  1406. Code authReturn = KeeperException.Code.AUTHFAILED;
  1407. if (ap != null) {
  1408. try {
  1409. // handleAuthentication may close the connection, to allow the client to choose
  1410. // a different server to connect to.
  1411. authReturn = ap.handleAuthentication(
  1412. new ServerAuthenticationProvider.ServerObjs(this, cnxn),
  1413. authPacket.getAuth());
  1414. } catch (RuntimeException e) {
  1415. LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
  1416. authReturn = KeeperException.Code.AUTHFAILED;
  1417. }
  1418. }
  1419. if (authReturn == KeeperException.Code.OK) {
  1420. LOG.info("Session 0x{}: auth success for scheme {} and address {}",
  1421. Long.toHexString(cnxn.getSessionId()), scheme,
  1422. cnxn.getRemoteSocketAddress());
  1423. ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
  1424. cnxn.sendResponse(rh, null, null);
  1425. } else {
  1426. if (ap == null) {
  1427. LOG.warn(
  1428. "No authentication provider for scheme: {} has {}",
  1429. scheme,
  1430. ProviderRegistry.listProviders());
  1431. } else {
  1432. LOG.warn("Authentication failed for scheme: {}", scheme);
  1433. }
  1434. // send a response...
  1435. ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
  1436. cnxn.sendResponse(rh, null, null);
  1437. // ... and close connection
  1438. cnxn.sendBuffer(ServerCnxnFactory.closeConn);
  1439. cnxn.disableRecv();
  1440. }
  1441. return;
  1442. } else if (h.getType() == OpCode.sasl) {
  1443. processSasl(incomingBuffer, cnxn, h);
  1444. } else {
  1445. if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
  1446. // Authentication enforcement is failed
  1447. // Already sent response to user about failure and closed the session, lets return
  1448. return;
  1449. } else {
  1450. Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
  1451. int length = incomingBuffer.limit();
  1452. if (isLargeRequest(length)) {
  1453. // checkRequestSize will throw IOException if request is rejected
  1454. checkRequestSizeWhenMessageReceived(length);
  1455. si.setLargeRequestSize(length);
  1456. }
  1457. si.setOwner(ServerCnxn.me);
  1458. submitRequest(si);
  1459. }
  1460. }
  1461. }
  1462. private static boolean isSaslSuperUser(String id) {
  1463. if (id == null || id.isEmpty()) {
  1464. return false;
  1465. }
  1466. Properties properties = System.getProperties();
  1467. int prefixLen = SASL_SUPER_USER.length();
  1468. for (String k : properties.stringPropertyNames()) {
  1469. if (k.startsWith(SASL_SUPER_USER)
  1470. && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) {
  1471. String value = properties.getProperty(k);
  1472. if (value != null && value.equals(id)) {
  1473. return true;
  1474. }
  1475. }
  1476. }
  1477. return false;
  1478. }
  1479. private static boolean shouldAllowSaslFailedClientsConnect() {
  1480. return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
  1481. }
  1482. private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
  1483. LOG.debug("Responding to client SASL token.");
  1484. GetSASLRequest clientTokenRecord = new GetSASLRequest();
  1485. ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
  1486. byte[] clientToken = clientTokenRecord.getToken();
  1487. LOG.debug("Size of client SASL token: {}", clientToken.length);
  1488. byte[] responseToken = null;
  1489. try {
  1490. ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer;
  1491. try {
  1492. // note that clientToken might be empty (clientToken.length == 0):
  1493. // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
  1494. // SASL negotiation process.
  1495. responseToken = saslServer.evaluateResponse(clientToken);
  1496. if (saslServer.isComplete()) {
  1497. String authorizationID = saslServer.getAuthorizationID();
  1498. LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}",
  1499. Long.toHexString(cnxn.getSessionId()), authorizationID);
  1500. cnxn.addAuthInfo(new Id("sasl", authorizationID));
  1501. if (isSaslSuperUser(authorizationID)) {
  1502. cnxn.addAuthInfo(new Id("super", ""));
  1503. LOG.info(
  1504. "Session 0x{}: Authenticated Id '{}' as super user",
  1505. Long.toHexString(cnxn.getSessionId()),
  1506. authorizationID);
  1507. }
  1508. }
  1509. } catch (SaslException e) {
  1510. LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e);
  1511. if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) {
  1512. LOG.warn("Maintaining client connection despite SASL authentication failure.");
  1513. } else {
  1514. int error;
  1515. if (authHelper.isSaslAuthRequired()) {
  1516. LOG.warn(
  1517. "Closing client connection due to server requires client SASL authenticaiton,"
  1518. + "but client SASL authentication has failed, or client is not configured with SASL "
  1519. + "authentication.");
  1520. error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
  1521. } else {
  1522. LOG.warn("Closing client connection due to SASL authentication failure.");
  1523. error = Code.AUTHFAILED.intValue();
  1524. }
  1525. ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
  1526. cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
  1527. cnxn.sendCloseSession();
  1528. cnxn.disableRecv();
  1529. return;
  1530. }
  1531. }
  1532. } catch (NullPointerException e) {
  1533. LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
  1534. }
  1535. if (responseToken != null) {
  1536. LOG.debug("Size of server SASL response: {}", responseToken.length);
  1537. }
  1538. ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
  1539. Record record = new SetSASLResponse(responseToken);
  1540. cnxn.sendResponse(replyHeader, record, "response");
  1541. }
  1542. // entry point for quorum/Learner.java
  1543. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
  1544. processTxnForSessionEvents(null, hdr, txn);
  1545. return processTxnInDB(hdr, txn, null);
  1546. }
  1547. // entry point for FinalRequestProcessor.java
  1548. public ProcessTxnResult processTxn(Request request) {
  1549. TxnHeader hdr = request.getHdr();
  1550. processTxnForSessionEvents(request, hdr, request.getTxn());
  1551. final boolean writeRequest = (hdr != null);
  1552. final boolean quorumRequest = request.isQuorum();
  1553. // return fast w/o synchronization when we get a read
  1554. if (!writeRequest && !quorumRequest) {
  1555. return new ProcessTxnResult();
  1556. }
  1557. synchronized (outstandingChanges) {
  1558. ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
  1559. // request.hdr is set for write requests, which are the only ones
  1560. // that add to outstandingChanges.
  1561. if (writeRequest) {
  1562. long zxid = hdr.getZxid();
  1563. while (!outstandingChanges.isEmpty()
  1564. && outstandingChanges.peek().zxid <= zxid) {
  1565. ChangeRecord cr = outstandingChanges.remove();
  1566. ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
  1567. if (cr.zxid < zxid) {
  1568. LOG.warn(
  1569. "Zxid outstanding 0x{} is less than current 0x{}",
  1570. Long.toHexString(cr.zxid),
  1571. Long.toHexString(zxid));
  1572. }
  1573. if (outstandingChangesForPath.get(cr.path) == cr) {
  1574. outstandingChangesForPath.remove(cr.path);
  1575. }
  1576. }
  1577. }
  1578. // do not add non quorum packets to the queue.
  1579. if (quorumRequest) {
  1580. getZKDatabase().addCommittedProposal(request);
  1581. }
  1582. return rc;
  1583. }
  1584. }
  1585. private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
  1586. int opCode = (request == null) ? hdr.getType() : request.type;
  1587. long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
  1588. if (opCode == OpCode.createSession) {
  1589. if (hdr != null && txn instanceof CreateSessionTxn) {
  1590. CreateSessionTxn cst = (CreateSessionTxn) txn;
  1591. sessionTracker.commitSession(sessionId, cst.getTimeOut());
  1592. } else if (request == null || !request.isLocalSession()) {
  1593. LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString());
  1594. }
  1595. } else if (opCode == OpCode.closeSession) {
  1596. sessionTracker.removeSession(sessionId);
  1597. }
  1598. }
  1599. private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
  1600. if (hdr == null) {
  1601. return new ProcessTxnResult();
  1602. } else {
  1603. return getZKDatabase().processTxn(hdr, txn, digest);
  1604. }
  1605. }
  1606. public Map<Long, Set<Long>> getSessionExpiryMap() {
  1607. return sessionTracker.getSessionExpiryMap();
  1608. }
  1609. /**
  1610. * This method is used to register the ZooKeeperServerShutdownHandler to get
  1611. * server's error or shutdown state change notifications.
  1612. * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for
  1613. * every server state changes {@link #setState(State)}.
  1614. *
  1615. * @param zkShutdownHandler shutdown handler
  1616. */
  1617. void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) {
  1618. this.zkShutdownHandler = zkShutdownHandler;
  1619. }
  1620. public boolean isResponseCachingEnabled() {
  1621. return isResponseCachingEnabled;
  1622. }
  1623. public void setResponseCachingEnabled(boolean isEnabled) {
  1624. isResponseCachingEnabled = isEnabled;
  1625. }
  1626. public ResponseCache getReadResponseCache() {
  1627. return isResponseCachingEnabled ? readResponseCache : null;
  1628. }
  1629. public ResponseCache getGetChildrenResponseCache() {
  1630. return isResponseCachingEnabled ? getChildrenResponseCache : null;
  1631. }
  1632. protected void registerMetrics() {
  1633. MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
  1634. final ZKDatabase zkdb = this.getZKDatabase();
  1635. final ServerStats stats = this.serverStats();
  1636. rootContext.registerGauge("avg_latency", stats::getAvgLatency);
  1637. rootContext.registerGauge("max_latency", stats::getMaxLatency);
  1638. rootContext.registerGauge("min_latency", stats::getMinLatency);
  1639. rootContext.registerGauge("packets_received", stats::getPacketsReceived);
  1640. rootContext.registerGauge("packets_sent", stats::getPacketsSent);
  1641. rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections);
  1642. rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests);
  1643. rootContext.registerGauge("uptime", stats::getUptime);
  1644. rootContext.registerGauge("znode_count", zkdb::getNodeCount);
  1645. rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount);
  1646. rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount);
  1647. rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize);
  1648. rootContext.registerGauge("global_sessions", zkdb::getSessionCount);
  1649. rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount);
  1650. OSMXBean osMbean = new OSMXBean();
  1651. rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount);
  1652. rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount);
  1653. rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance);
  1654. rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize);
  1655. rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize);
  1656. rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize);
  1657. rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum);
  1658. rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount);
  1659. rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount);
  1660. rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount);
  1661. }
  1662. protected void unregisterMetrics() {
  1663. MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
  1664. rootContext.unregisterGauge("avg_latency");
  1665. rootContext.unregisterGauge("max_latency");
  1666. rootContext.unregisterGauge("min_latency");
  1667. rootContext.unregisterGauge("packets_received");
  1668. rootContext.unregisterGauge("packets_sent");
  1669. rootContext.unregisterGauge("num_alive_connections");
  1670. rootContext.unregisterGauge("outstanding_requests");
  1671. rootContext.unregisterGauge("uptime");
  1672. rootContext.unregisterGauge("znode_count");
  1673. rootContext.unregisterGauge("watch_count");
  1674. rootContext.unregisterGauge("ephemerals_count");
  1675. rootContext.unregisterGauge("approximate_data_size");
  1676. rootContext.unregisterGauge("global_sessions");
  1677. rootContext.unregisterGauge("local_sessions");
  1678. rootContext.unregisterGauge("open_file_descriptor_count");
  1679. rootContext.unregisterGauge("max_file_descriptor_count");
  1680. rootContext.unregisterGauge("connection_drop_probability");
  1681. rootContext.unregisterGauge("last_client_response_size");
  1682. rootContext.unregisterGauge("max_client_response_size");
  1683. rootContext.unregisterGauge("min_client_response_size");
  1684. rootContext.unregisterGauge("auth_failed_count");
  1685. rootContext.unregisterGauge("non_mtls_remote_conn_count");
  1686. rootContext.unregisterGauge("non_mtls_local_conn_count");
  1687. }
  1688. /**
  1689. * Hook into admin server, useful to expose additional data
  1690. * that do not represent metrics.
  1691. *
  1692. * @param response a sink which collects the data.
  1693. */
  1694. public void dumpMonitorValues(BiConsumer<String, Object> response) {
  1695. ServerStats stats = serverStats();
  1696. response.accept("version", Version.getFullVersion());
  1697. response.accept("server_state", stats.getServerState());
  1698. }
  1699. /**
  1700. * Grant or deny authorization to an operation on a node as a function of:
  1701. * @param cnxn : the server connection
  1702. * @param acl : set of ACLs for the node
  1703. * @param perm : the permission that the client is requesting
  1704. * @param ids : the credentials supplied by the client
  1705. * @param path : the ZNode path
  1706. * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null.
  1707. */
  1708. public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException {
  1709. if (skipACL) {
  1710. return;
  1711. }
  1712. LOG.debug("Permission requested: {} ", perm);
  1713. LOG.debug("ACLs for node: {}", acl);
  1714. LOG.debug("Client credentials: {}", ids);
  1715. if (acl == null || acl.size() == 0) {
  1716. return;
  1717. }
  1718. for (Id authId : ids) {
  1719. if (authId.getScheme().equals("super")) {
  1720. return;
  1721. }
  1722. }
  1723. for (ACL a : acl) {
  1724. Id id = a.getId();
  1725. if ((a.getPerms() & perm) != 0) {
  1726. if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
  1727. return;
  1728. }
  1729. ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme());
  1730. if (ap != null) {
  1731. for (Id authId : ids) {
  1732. if (authId.getScheme().equals(id.getScheme())
  1733. && ap.matches(
  1734. new ServerAuthenticationProvider.ServerObjs(this, cnxn),
  1735. new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) {
  1736. return;
  1737. }
  1738. }
  1739. }
  1740. }
  1741. }
  1742. throw new KeeperException.NoAuthException();
  1743. }
  1744. /**
  1745. * check a path whether exceeded the quota.
  1746. *
  1747. * @param path
  1748. * the path of the node, used for the quota prefix check
  1749. * @param lastData
  1750. * the current node data, {@code null} for none
  1751. * @param data
  1752. * the data to be set, or {@code null} for none
  1753. * @param type
  1754. * currently, create and setData need to check quota
  1755. */
  1756. public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException {
  1757. if (!enforceQuota) {
  1758. return;
  1759. }
  1760. long dataBytes = (data == null) ? 0 : data.length;
  1761. ZKDatabase zkDatabase = getZKDatabase();
  1762. String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path);
  1763. if (StringUtils.isEmpty(lastPrefix)) {
  1764. return;
  1765. }
  1766. switch (type) {
  1767. case OpCode.create:
  1768. checkQuota(lastPrefix, dataBytes, 1);
  1769. break;
  1770. case OpCode.setData:
  1771. checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0);
  1772. break;
  1773. default:
  1774. throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
  1775. }
  1776. }
  1777. /**
  1778. * check a path whether exceeded the quota.
  1779. *
  1780. * @param lastPrefix
  1781. the path of the node which has a quota.
  1782. * @param bytesDiff
  1783. * the diff to be added to number of bytes
  1784. * @param countDiff
  1785. * the diff to be added to the count
  1786. */
  1787. private void checkQuota(String lastPrefix, long bytesDiff, long countDiff)
  1788. throws KeeperException.QuotaExceededException {
  1789. LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);
  1790. // now check the quota we set
  1791. String limitNode = Quotas.limitPath(lastPrefix);
  1792. DataNode node = getZKDatabase().getNode(limitNode);
  1793. StatsTrack limitStats;
  1794. if (node == null) {
  1795. // should not happen
  1796. LOG.error("Missing limit node for quota {}", limitNode);
  1797. return;
  1798. }
  1799. synchronized (node) {
  1800. limitStats = new StatsTrack(node.data);
  1801. }
  1802. //check the quota
  1803. boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1);
  1804. boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1);
  1805. if (!checkCountQuota && !checkByteQuota) {
  1806. return;
  1807. }
  1808. //check the statPath quota
  1809. String statNode = Quotas.statPath(lastPrefix);
  1810. node = getZKDatabase().getNode(statNode);
  1811. StatsTrack currentStats;
  1812. if (node == null) {
  1813. // should not happen
  1814. LOG.error("Missing node for stat {}", statNode);
  1815. return;
  1816. }
  1817. synchronized (node) {
  1818. currentStats = new StatsTrack(node.data);
  1819. }
  1820. //check the Count Quota
  1821. if (checkCountQuota) {
  1822. long newCount = currentStats.getCount() + countDiff;
  1823. boolean isCountHardLimit = limitStats.getCountHardLimit() > -1 ? true : false;
  1824. long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount();
  1825. if (newCount > countLimit) {
  1826. String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
  1827. RATE_LOGGER.rateLimitLog(msg);
  1828. if (isCountHardLimit) {
  1829. throw new KeeperException.QuotaExceededException(lastPrefix);
  1830. }
  1831. }
  1832. }
  1833. //check the Byte Quota
  1834. if (checkByteQuota) {
  1835. long newBytes = currentStats.getBytes() + bytesDiff;
  1836. boolean isByteHardLimit = limitStats.getByteHardLimit() > -1 ? true : false;
  1837. long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes();
  1838. if (newBytes > byteLimit) {
  1839. String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
  1840. RATE_LOGGER.rateLimitLog(msg);
  1841. if (isByteHardLimit) {
  1842. throw new KeeperException.QuotaExceededException(lastPrefix);
  1843. }
  1844. }
  1845. }
  1846. }
  1847. public static boolean isDigestEnabled() {
  1848. return digestEnabled;
  1849. }
  1850. public static void setDigestEnabled(boolean digestEnabled) {
  1851. LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
  1852. ZooKeeperServer.digestEnabled = digestEnabled;
  1853. }
  1854. /**
  1855. * Trim a path to get the immediate predecessor.
  1856. *
  1857. * @param path
  1858. * @return
  1859. * @throws KeeperException.BadArgumentsException
  1860. */
  1861. private String parentPath(String path) throws KeeperException.BadArgumentsException {
  1862. int lastSlash = path.lastIndexOf('/');
  1863. if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) {
  1864. throw new KeeperException.BadArgumentsException(path);
  1865. }
  1866. return lastSlash == 0 ? "/" : path.substring(0, lastSlash);
  1867. }
  1868. private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException {
  1869. boolean mustCheckACL = false;
  1870. String path = null;
  1871. List<ACL> acl = null;
  1872. switch (request.type) {
  1873. case OpCode.create:
  1874. case OpCode.create2: {
  1875. CreateRequest req = new CreateRequest();
  1876. if (buffer2Record(request.request, req)) {
  1877. mustCheckACL = true;
  1878. acl = req.getAcl();
  1879. path = parentPath(req.getPath());
  1880. }
  1881. break;
  1882. }
  1883. case OpCode.delete: {
  1884. DeleteRequest req = new DeleteRequest();
  1885. if (buffer2Record(request.request, req)) {
  1886. path = parentPath(req.getPath());
  1887. }
  1888. break;
  1889. }
  1890. case OpCode.setData: {
  1891. SetDataRequest req = new SetDataRequest();
  1892. if (buffer2Record(request.request, req)) {
  1893. path = req.getPath();
  1894. }
  1895. break;
  1896. }
  1897. case OpCode.setACL: {
  1898. SetACLRequest req = new SetACLRequest();
  1899. if (buffer2Record(request.request, req)) {
  1900. mustCheckACL = true;
  1901. acl = req.getAcl();
  1902. path = req.getPath();
  1903. }
  1904. break;
  1905. }
  1906. }
  1907. if (mustCheckACL) {
  1908. /* we ignore the extrapolated ACL returned by fixupACL because
  1909. * we only care about it being well-formed (and if it isn't, an
  1910. * exception will be raised).
  1911. */
  1912. PrepRequestProcessor.fixupACL(path, request.authInfo, acl);
  1913. }
  1914. return path;
  1915. }
  1916. private int effectiveACLPerms(Request request) {
  1917. switch (request.type) {
  1918. case OpCode.create:
  1919. case OpCode.create2:
  1920. return ZooDefs.Perms.CREATE;
  1921. case OpCode.delete:
  1922. return ZooDefs.Perms.DELETE;
  1923. case OpCode.setData:
  1924. return ZooDefs.Perms.WRITE;
  1925. case OpCode.setACL:
  1926. return ZooDefs.Perms.ADMIN;
  1927. default:
  1928. return ZooDefs.Perms.ALL;
  1929. }
  1930. }
  1931. /**
  1932. * Check Write Requests for Potential Access Restrictions
  1933. * <p/>
  1934. * Before a request is being proposed to the quorum, lets check it
  1935. * against local ACLs. Non-write requests (read, session, etc.)
  1936. * are passed along. Invalid requests are sent a response.
  1937. * <p/>
  1938. * While we are at it, if the request will set an ACL: make sure it's
  1939. * a valid one.
  1940. *
  1941. * @param request
  1942. * @return true if request is permitted, false if not.
  1943. * @throws java.io.IOException
  1944. */
  1945. public boolean authWriteRequest(Request request) {
  1946. int err;
  1947. String pathToCheck;
  1948. if (!enableEagerACLCheck) {
  1949. return true;
  1950. }
  1951. err = KeeperException.Code.OK.intValue();
  1952. try {
  1953. pathToCheck = effectiveACLPath(request);
  1954. if (pathToCheck != null) {
  1955. checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null);
  1956. }
  1957. } catch (KeeperException.NoAuthException e) {
  1958. LOG.debug("Request failed ACL check", e);
  1959. err = e.code().intValue();
  1960. } catch (KeeperException.InvalidACLException e) {
  1961. LOG.debug("Request has an invalid ACL check", e);
  1962. err = e.code().intValue();
  1963. } catch (KeeperException.NoNodeException e) {
  1964. LOG.debug("ACL check against non-existent node: {}", e.getMessage());
  1965. } catch (KeeperException.BadArgumentsException e) {
  1966. LOG.debug("ACL check against illegal node path: {}", e.getMessage());
  1967. } catch (Throwable t) {
  1968. LOG.error("Uncaught exception in authWriteRequest with: ", t);
  1969. throw t;
  1970. } finally {
  1971. if (err != KeeperException.Code.OK.intValue()) {
  1972. /* This request has a bad ACL, so we are dismissing it early. */
  1973. decInProcess();
  1974. ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
  1975. try {
  1976. request.cnxn.sendResponse(rh, null, null);
  1977. } catch (IOException e) {
  1978. LOG.error("IOException : {}", e);
  1979. }
  1980. }
  1981. }
  1982. return err == KeeperException.Code.OK.intValue();
  1983. }
  1984. private boolean buffer2Record(ByteBuffer request, Record record) {
  1985. boolean rv = false;
  1986. try {
  1987. ByteBufferInputStream.byteBuffer2Record(request, record);
  1988. request.rewind();
  1989. rv = true;
  1990. } catch (IOException ex) {
  1991. }
  1992. return rv;
  1993. }
  1994. public int getOutstandingHandshakeNum() {
  1995. if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
  1996. return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
  1997. } else {
  1998. return 0;
  1999. }
  2000. }
  2001. public boolean isReconfigEnabled() {
  2002. return this.reconfigEnabled;
  2003. }
  2004. public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
  2005. return zkShutdownHandler;
  2006. }
  2007. }