123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "common/hadoop_err.h"
- #include "protobuf/ProtobufRpcEngine.pb-c.h.s"
- #include "rpc/call.h"
- #include "rpc/messenger.h"
- #include "rpc/proxy.h"
- #include "rpc/varint.h"
- #include <errno.h>
- #include <inttypes.h>
- #include <netinet/in.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <uv.h>
- #define proxy_log_warn(proxy, fmt, ...) \
- fprintf(stderr, "WARN: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
- #define proxy_log_info(proxy, fmt, ...) \
- fprintf(stderr, "INFO: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
- #define proxy_log_debug(proxy, fmt, ...) \
- fprintf(stderr, "DEBUG: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
- /**
- * The maximum length that we'll allocate to hold a request to the server.
- * This number includes the RequestHeader, but not the RpcRequestHeader.
- */
- #define MAX_SEND_LEN (63 * 1024 * 1024)
- struct hrpc_proxy_builder {
- struct hrpc_proxy *proxy;
- };
- static const char OOM_ERROR[] = "OOM";
- void hrpc_proxy_init(struct hrpc_proxy *proxy,
- struct hrpc_messenger *msgr,
- const struct sockaddr_in *remote,
- const char *protocol, const char *username)
- {
- memset(proxy, 0, sizeof(*proxy));
- proxy->msgr = msgr;
- proxy->protocol = protocol;
- proxy->username = username;
- proxy->call.remote = *remote;
- }
- struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy)
- {
- struct hadoop_err *err;
- if (!hrpc_call_activate(&proxy->call)) {
- err = hadoop_lerr_alloc(EINVAL, "tried to start a call on a "
- "proxy which was still in use by another call.");
- proxy_log_warn(proxy, "hrpc_proxy_activate: %s",
- hadoop_err_msg(err));
- return err;
- }
- return NULL;
- }
- void hrpc_proxy_deactivate(struct hrpc_proxy *proxy)
- {
- hrpc_call_deactivate(&proxy->call);
- }
- void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size)
- {
- if (size > RPC_PROXY_USERDATA_MAX) {
- return NULL;
- }
- return proxy->userdata;
- }
- struct hrpc_sync_ctx *hrpc_proxy_alloc_sync_ctx(struct hrpc_proxy *proxy)
- {
- struct hrpc_sync_ctx *ctx =
- hrpc_proxy_alloc_userdata(proxy, sizeof(struct hrpc_proxy));
- if (!ctx) {
- return NULL;
- }
- if (uv_sem_init(&ctx->sem, 0)) {
- return NULL;
- }
- memset(&ctx, 0, sizeof(ctx));
- return ctx;
- }
- void hrpc_free_sync_ctx(struct hrpc_sync_ctx *ctx)
- {
- free(ctx->resp.base);
- uv_sem_destroy(&ctx->sem);
- }
- void hrpc_proxy_sync_cb(struct hrpc_response *resp, struct hadoop_err *err,
- void *cb_data)
- {
- struct hrpc_sync_ctx *ctx = cb_data;
- ctx->resp = *resp;
- ctx->err = err;
- uv_sem_post(&ctx->sem);
- }
- void hrpc_proxy_start(struct hrpc_proxy *proxy,
- const char *method, const void *payload, int payload_packed_len,
- hrpc_pack_cb_t payload_pack_cb,
- hrpc_raw_cb_t cb, void *cb_data)
- {
- RequestHeaderProto req_header = REQUEST_HEADER_PROTO__INIT;
- uint64_t buf_len;
- int32_t req_header_len, off = 0;
- uint8_t *buf;
- struct hrpc_call *call = &proxy->call;
- call->cb = cb;
- call->cb_data = cb_data;
- call->protocol = proxy->protocol;
- call->username = proxy->username;
- req_header.methodname = (char*)method;
- req_header.declaringclassprotocolname = (char*)proxy->protocol;
- req_header.clientprotocolversion = 1;
- req_header_len = request_header_proto__get_packed_size(&req_header);
- buf_len = varint32_size(req_header_len);
- buf_len += req_header_len;
- buf_len += varint32_size(payload_packed_len);
- buf_len += payload_packed_len;
- if (buf_len >= MAX_SEND_LEN) {
- hrpc_call_deliver_err(call,
- hadoop_lerr_alloc(EINVAL, "hrpc_proxy_setup_header: the "
- "request length is too long at %" PRId64 " bytes. The "
- "maximum we will send is %d bytes.", buf_len, MAX_SEND_LEN));
- return;
- }
- buf = malloc((size_t)buf_len);
- if (!buf) {
- hrpc_call_deliver_err(call,
- hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_setup_header: "
- "failed to allocate a buffer of length %" PRId64 " bytes.",
- buf_len));
- return;
- }
- varint32_encode(req_header_len, buf, buf_len, &off);
- request_header_proto__pack(&req_header, buf + off);
- off += req_header_len;
- varint32_encode(payload_packed_len, buf, buf_len, &off);
- payload_pack_cb(payload, buf + off);
- call->payload = uv_buf_init((char*)buf, buf_len);
- hrpc_messenger_start_outbound(proxy->msgr, &proxy->call);
- }
- // vim: ts=4:sw=4:tw=79:et
|