diff options
author | Yann Ylavic <ylavic@apache.org> | 2019-10-31 17:08:33 +0100 |
---|---|---|
committer | Yann Ylavic <ylavic@apache.org> | 2019-10-31 17:08:33 +0100 |
commit | e2d7af8692ad1c08f2ef45028d6d5044867d127c (patch) | |
tree | b8eca9179cad546ba97806754481a845e19fa5d5 /modules/proxy | |
parent | mod_proxy_http: fix load-balancer fallback for requests with a body. (diff) | |
download | apache2-e2d7af8692ad1c08f2ef45028d6d5044867d127c.tar.xz apache2-e2d7af8692ad1c08f2ef45028d6d5044867d127c.zip |
mod_proxy_http: follow up to r1869216.
Let's call stream_reqbody() for all rb_methods, no RB_SPOOL_CL special case.
This both simplifies code and allows to keep EOS into the input_brigade until
it's sent, and thus detect whether we already fetched the whole body if/when
proxy_http_handler() re-enters for different balancer members.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1869222 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/proxy')
-rw-r--r-- | modules/proxy/mod_proxy_http.c | 46 | ||||
-rw-r--r-- | modules/proxy/mod_proxy_wstunnel.c | 241 |
2 files changed, 246 insertions, 41 deletions
diff --git a/modules/proxy/mod_proxy_http.c b/modules/proxy/mod_proxy_http.c index 449a1d7ed8..c635961644 100644 --- a/modules/proxy/mod_proxy_http.c +++ b/modules/proxy/mod_proxy_http.c @@ -303,16 +303,18 @@ static int stream_reqbody_read(proxy_http_req_t *req, apr_bucket_brigade *bb, return OK; } -static int stream_reqbody(proxy_http_req_t *req, rb_methods rb_method) +static int stream_reqbody(proxy_http_req_t *req) { request_rec *r = req->r; int seen_eos = 0, rv = OK; apr_size_t hdr_len; char chunk_hdr[20]; /* must be here due to transient bucket. */ + conn_rec *origin = req->origin; proxy_conn_rec *p_conn = req->backend; apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc; apr_bucket_brigade *header_brigade = req->header_brigade; apr_bucket_brigade *input_brigade = req->input_brigade; + rb_methods rb_method = req->rb_method; apr_off_t bytes, bytes_streamed = 0; apr_bucket *e; @@ -326,7 +328,7 @@ static int stream_reqbody(proxy_http_req_t *req, rb_methods rb_method) } if (!APR_BRIGADE_EMPTY(input_brigade)) { - /* If this brigade contains EOS, either stop or remove it. */ + /* If this brigade contains EOS, remove it and be done. */ if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) { seen_eos = 1; @@ -368,7 +370,8 @@ static int stream_reqbody(proxy_http_req_t *req, rb_methods rb_method) APR_BRIGADE_INSERT_TAIL(input_brigade, e); } } - else if (bytes_streamed > req->cl_val) { + else if (rb_method == RB_STREAM_CL + && bytes_streamed > req->cl_val) { /* C-L < bytes streamed?!? * We will error out after the body is completely * consumed, but we can't stream more bytes at the @@ -400,7 +403,7 @@ static int stream_reqbody(proxy_http_req_t *req, rb_methods rb_method) APR_BRIGADE_PREPEND(input_brigade, header_brigade); /* Flush here on EOS because we won't stream_reqbody_read() again */ - rv = ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin, + rv = ap_proxy_pass_brigade(bucket_alloc, r, p_conn, origin, input_brigade, seen_eos); if (rv != OK) { return rv; @@ -462,10 +465,6 @@ static int spool_reqbody_cl(proxy_http_req_t *req, apr_off_t *bytes_spooled) /* If this brigade contains EOS, either stop or remove it. */ if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) { seen_eos = 1; - - /* We can't pass this EOS to the output_filters. */ - e = APR_BRIGADE_LAST(input_brigade); - apr_bucket_delete(e); } apr_brigade_length(input_brigade, 1, &bytes); @@ -859,33 +858,19 @@ static int ap_proxy_http_request(proxy_http_req_t *req) { int rv; request_rec *r = req->r; - apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc; - apr_bucket_brigade *header_brigade = req->header_brigade; - apr_bucket_brigade *input_brigade = req->input_brigade; /* send the request header/body, if any. */ switch (req->rb_method) { + case RB_SPOOL_CL: case RB_STREAM_CL: case RB_STREAM_CHUNKED: if (req->do_100_continue) { - rv = ap_proxy_pass_brigade(bucket_alloc, r, req->backend, - req->origin, header_brigade, 1); + rv = ap_proxy_pass_brigade(req->bucket_alloc, r, req->backend, + req->origin, req->header_brigade, 1); } else { - rv = stream_reqbody(req, req->rb_method); - } - break; - - case RB_SPOOL_CL: - /* Prefetch has built the header and spooled the whole body; - * if we don't expect 100-continue we can flush both all at once, - * otherwise flush the header only. - */ - if (!req->do_100_continue) { - APR_BRIGADE_CONCAT(header_brigade, input_brigade); + rv = stream_reqbody(req); } - rv = ap_proxy_pass_brigade(bucket_alloc, r, req->backend, - req->origin, header_brigade, 1); break; default: @@ -1590,15 +1575,10 @@ int ap_proxy_http_process_response(proxy_http_req_t *req) /* Send the request body (fully). */ switch(req->rb_method) { + case RB_SPOOL_CL: case RB_STREAM_CL: case RB_STREAM_CHUNKED: - status = stream_reqbody(req, req->rb_method); - break; - case RB_SPOOL_CL: - /* Prefetch has spooled the whole body, flush it. */ - status = ap_proxy_pass_brigade(req->bucket_alloc, r, - backend, origin, - req->input_brigade, 1); + status = stream_reqbody(req); break; default: /* Shouldn't happen */ diff --git a/modules/proxy/mod_proxy_wstunnel.c b/modules/proxy/mod_proxy_wstunnel.c index ba875980ba..2416e74008 100644 --- a/modules/proxy/mod_proxy_wstunnel.c +++ b/modules/proxy/mod_proxy_wstunnel.c @@ -294,6 +294,223 @@ static int proxy_wstunnel_canon(request_rec *r, char *url) return OK; } +static request_rec *make_resp(conn_rec *c, request_rec *r) +{ + apr_pool_t *pool; + request_rec *rp; + + apr_pool_create(&pool, c->pool); + + rp = apr_pcalloc(pool, sizeof(*r)); + + rp->pool = pool; + rp->status = HTTP_OK; + + rp->headers_in = apr_table_make(pool, 50); + rp->trailers_in = apr_table_make(pool, 5); + + rp->subprocess_env = apr_table_make(pool, 50); + rp->headers_out = apr_table_make(pool, 12); + rp->trailers_out = apr_table_make(pool, 5); + rp->err_headers_out = apr_table_make(pool, 5); + rp->notes = apr_table_make(pool, 5); + + rp->server = r->server; + rp->log = r->log; + rp->proxyreq = r->proxyreq; + rp->request_time = r->request_time; + rp->connection = c; + rp->output_filters = c->output_filters; + rp->input_filters = c->input_filters; + rp->proto_output_filters = c->output_filters; + rp->proto_input_filters = c->input_filters; + rp->useragent_ip = c->client_ip; + rp->useragent_addr = c->client_addr; + + rp->request_config = ap_create_request_config(pool); + + return rp; +} + +static int proxy_wstunnel_handle_http_response(request_rec *r, + proxy_conn_rec *backend, + proxy_server_conf *sconf, + apr_bucket_brigade *bb) +{ + conn_rec *origin = backend->connection; + proxy_worker *worker = backend->worker; + char fixed_buffer[HUGE_STRING_LEN]; + char *buffer = fixed_buffer; + int size = HUGE_STRING_LEN; + ap_mime_headers_ctx_t ctx; + request_rec *resp; + apr_status_t rv; + apr_size_t len; + int rc; + + /* Only use dynamically sized buffer if user specifies ResponseFieldSize */ + if (worker->s->response_field_size_set) { + size = worker->s->response_field_size; + if (size > HUGE_STRING_LEN) { + buffer = apr_palloc(r->pool, size); + } + } + + resp = make_resp(origin, r); + + rv = ap_rgetline(&buffer, size, &len, resp, 0, bb); + apr_brigade_cleanup(bb); + + if (rv != APR_SUCCESS || !apr_date_checkmask(buffer, "HTTP/#.# ### *")) { + return HTTP_BAD_GATEWAY; + } + + r->status = atoi(&buffer[9]); + if (!ap_is_HTTP_VALID_RESPONSE(r->status)) { + return HTTP_BAD_GATEWAY; + } + r->status_line = apr_pstrdup(r->pool, &buffer[9]); + + memset(&ctx, 0, sizeof(ctx)); + ctx.bb = bb; + ctx.headers = r->headers_out; + ctx.limit_req_fieldsize = size; + rc = ap_get_mime_headers_ex(r, origin->input_filters, &ctx); + apr_brigade_cleanup(bb); + if (rc != OK) { + r->status = HTTP_OK; + r->status_line = NULL; + apr_table_clear(r->headers_out); + return rc; + } + +#if 0 + if (r->status != HTTP_SWITCHING_PROTOCOLS) { + conn_rec *c = r->connection; + apr_read_type_e block = APR_NONBLOCK_READ; + apr_bucket_brigade *pass_bb = apr_brigade_create(r->pool, + c->bucket_alloc); + int finish = 0; + + r->sent_bodyct = 1; + do { + apr_bucket *e; + apr_off_t readbytes = 0; + + rv = ap_get_brigade(origin->input_filters, bb, + AP_MODE_READBYTES, block, + sconf->io_buffer_size); + + /* ap_get_brigade will return success with an empty brigade + * for a non-blocking read which would block: */ + if (block == APR_NONBLOCK_READ + && (APR_STATUS_IS_EAGAIN(rv) + || (rv == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)))) { + /* flush to the client and switch to blocking mode */ + e = apr_bucket_flush_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, e); + if (ap_pass_brigade(r->output_filters, bb) + || c->aborted) { + finish = 1; + rc = DONE; + } + apr_brigade_cleanup(bb); + block = APR_BLOCK_READ; + continue; + } + if (rv == APR_EOF) { + break; + } + if (rv != APR_SUCCESS) { + if (rv == APR_ENOSPC) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02475) + "Response chunk/line was too large to parse"); + } + else if (rv == APR_ENOTIMPL) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02476) + "Response Transfer-Encoding was not recognised"); + } + else { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01110) + "Network error reading response"); + } + + /* In this case, we are in real trouble because + * our backend bailed on us. Given we're half way + * through a response, our only option is to + * disconnect the client too. + */ + e = ap_bucket_error_create(HTTP_BAD_GATEWAY, NULL, + r->pool, c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, e); + e = ap_bucket_eoc_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, e); + ap_pass_brigade(r->output_filters, bb); + apr_brigade_cleanup(bb); + rc = DONE; + break; + } + + /* next time try a non-blocking read */ + block = APR_NONBLOCK_READ; + + if (!apr_is_empty_table(resp->trailers_in)) { + apr_table_do(add_trailers, r->trailers_out, + resp->trailers_in, NULL); + apr_table_clear(resp->trailers_in); + } + + apr_brigade_length(bb, 0, &readbytes); + backend->worker->s->read += readbytes; + + /* sanity check */ + if (APR_BRIGADE_EMPTY(bb)) { + break; + } + + /* Switch the allocator lifetime of the buckets */ + ap_proxy_buckets_lifetime_transform(r, bb, pass_bb); + + /* found the last brigade? */ + if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(pass_bb))) { + /* the brigade may contain transient buckets that contain + * data that lives only as long as the backend connection. + * Force a setaside so these transient buckets become heap + * buckets that live as long as the request. + */ + for (e = APR_BRIGADE_FIRST(pass_bb); + e != APR_BRIGADE_SENTINEL(pass_bb); + e = APR_BUCKET_NEXT(e)) { + apr_bucket_setaside(e, r->pool); + } + /* finally it is safe to clean up the brigade from the + * connection pool, as we have forced a setaside on all + * buckets. + */ + apr_brigade_cleanup(bb); + finish = 1; + } + + /* try send what we read */ + if (ap_pass_brigade(r->output_filters, pass_bb) != APR_SUCCESS + || c->aborted) { + /* Ack! Phbtt! Die! User aborted! */ + finish = 1; + rc = DONE; + } + + /* make sure we always clean up after ourselves */ + apr_brigade_cleanup(pass_bb); + apr_brigade_cleanup(bb); + } while (!finish); + + return rc; + } +#endif + + return DECLINED; +} + /* * process the request and write the response. */ @@ -318,14 +535,13 @@ static int proxy_wstunnel_request(apr_pool_t *p, request_rec *r, apr_bucket_brigade *bb = apr_brigade_create(p, c->bucket_alloc); apr_socket_t *client_socket = ap_get_conn_socket(c); ws_baton_t *baton = apr_pcalloc(r->pool, sizeof(ws_baton_t)); - int status; proxyws_dir_conf *dconf = ap_get_module_config(r->per_dir_config, &proxy_wstunnel_module); const char *upgrade_method = *worker->s->upgrade ? worker->s->upgrade : "WebSocket"; - - header_brigade = apr_brigade_create(p, backconn->bucket_alloc); + int status; ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, "sending request"); + header_brigade = apr_brigade_create(p, backconn->bucket_alloc); rv = ap_proxy_create_hdrbrgd(p, header_brigade, r, conn, worker, conf, uri, url, server_portstr, &old_cl_val, &old_te_val); @@ -334,13 +550,19 @@ static int proxy_wstunnel_request(apr_pool_t *p, request_rec *r, } if (ap_cstr_casecmp(upgrade_method, "NONE") == 0) { - buf = apr_pstrdup(p, "Upgrade: WebSocket" CRLF "Connection: Upgrade" CRLF CRLF); + buf = apr_pstrdup(p, "Upgrade: WebSocket" CRLF + "Connection: Upgrade" CRLF + CRLF); } else if (ap_cstr_casecmp(upgrade_method, "ANY") == 0) { const char *upgrade; upgrade = apr_table_get(r->headers_in, "Upgrade"); - buf = apr_pstrcat(p, "Upgrade: ", upgrade, CRLF "Connection: Upgrade" CRLF CRLF, NULL); + buf = apr_pstrcat(p, "Upgrade: ", upgrade, CRLF + "Connection: Upgrade" CRLF + CRLF, NULL); } else { - buf = apr_pstrcat(p, "Upgrade: ", upgrade_method, CRLF "Connection: Upgrade" CRLF CRLF, NULL); + buf = apr_pstrcat(p, "Upgrade: ", upgrade_method, CRLF + "Connection: Upgrade" CRLF + CRLF, NULL); } ap_xlate_proto_to_ascii(buf, strlen(buf)); e = apr_bucket_pool_create(buf, strlen(buf), p, c->bucket_alloc); @@ -350,7 +572,9 @@ static int proxy_wstunnel_request(apr_pool_t *p, request_rec *r, header_brigade, 1)) != OK) return rv; - apr_brigade_cleanup(header_brigade); + if ((rv = proxy_wstunnel_handle_http_response(r, conn, conf, + header_brigade)) != DECLINED) + return rv; ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, "setting up poll()"); @@ -466,7 +690,7 @@ static int proxy_wstunnel_handler(request_rec *r, proxy_worker *worker, char *locurl = url; apr_uri_t *uri; int is_ssl = 0; - const char *upgrade_method = *worker->s->upgrade ? worker->s->upgrade : "WebSocket"; + const char *upgrade_method; if (ap_cstr_casecmpn(url, "wss:", 4) == 0) { scheme = "WSS"; @@ -480,6 +704,7 @@ static int proxy_wstunnel_handler(request_rec *r, proxy_worker *worker, return DECLINED; } + upgrade_method = *worker->s->upgrade ? worker->s->upgrade : "WebSocket"; if (ap_cstr_casecmp(upgrade_method, "NONE") != 0) { const char *upgrade; upgrade = apr_table_get(r->headers_in, "Upgrade"); |