diff options
author | Stefan Eissing <icing@apache.org> | 2023-06-20 14:01:09 +0200 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2023-06-20 14:01:09 +0200 |
commit | 3ed9d65b05184f8859d9d37654c54a0d00ef0a96 (patch) | |
tree | 46e9fd75f8a294b0463b238d4b2793499d8ed65f /test | |
parent | stealing numbers (diff) | |
download | apache2-3ed9d65b05184f8859d9d37654c54a0d00ef0a96.tar.xz apache2-3ed9d65b05184f8859d9d37654c54a0d00ef0a96.zip |
*) mod_http2: added support for bootstrapping WebSockets via HTTP/2, as
described in RFC 8441. A new directive 'H2WebSockets on|off' has been
added. The feature is by default not enabled.
As also discussed in the manual, this feature should work for setups
using "ProxyPass backend-url upgrade=websocket" without further changes.
Special server modules for WebSockets will have to be adapted,
most likely, as the handling if IO events is different with HTTP/2.
HTTP/2 WebSockets are supported on platforms with native pipes. This
excludes Windows.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1910507 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r-- | test/clients/.gitignore | 1 | ||||
-rw-r--r-- | test/clients/Makefile.in | 20 | ||||
-rw-r--r-- | test/clients/h2ws.c | 1096 | ||||
-rw-r--r-- | test/modules/http2/test_800_websockets.py | 306 | ||||
-rw-r--r-- | test/modules/http2/ws_server.py | 100 | ||||
-rw-r--r-- | test/pyhttpd/config.ini.in | 1 | ||||
-rw-r--r-- | test/pyhttpd/env.py | 16 | ||||
-rw-r--r-- | test/pyhttpd/ws_util.py | 137 | ||||
-rwxr-xr-x | test/travis_run_linux.sh | 2 |
9 files changed, 1677 insertions, 2 deletions
diff --git a/test/clients/.gitignore b/test/clients/.gitignore new file mode 100644 index 0000000000..18b126304c --- /dev/null +++ b/test/clients/.gitignore @@ -0,0 +1 @@ +h2ws
\ No newline at end of file diff --git a/test/clients/Makefile.in b/test/clients/Makefile.in new file mode 100644 index 0000000000..a322a58416 --- /dev/null +++ b/test/clients/Makefile.in @@ -0,0 +1,20 @@ +DISTCLEAN_TARGETS = h2ws + +CLEAN_TARGETS = h2ws + +bin_PROGRAMS = h2ws +TARGETS = $(bin_PROGRAMS) + +PROGRAM_LDADD = $(UTIL_LDFLAGS) $(PROGRAM_DEPENDENCIES) $(EXTRA_LIBS) $(AP_LIBS) +PROGRAM_DEPENDENCIES = + +include $(top_builddir)/build/rules.mk + +h2ws.lo: h2ws.c + $(LIBTOOL) --mode=compile $(CC) $(ab_CFLAGS) $(ALL_CFLAGS) $(ALL_CPPFLAGS) \ + $(ALL_INCLUDES) $(PICFLAGS) $(LTCFLAGS) -c $< && touch $@ +h2ws_OBJECTS = h2ws.lo +h2ws_LDADD = -lnghttp2 +h2ws: $(h2ws_OBJECTS) + $(LIBTOOL) --mode=link $(CC) $(ALL_CFLAGS) $(PILDFLAGS) \ + $(LT_LDFLAGS) $(ALL_LDFLAGS) -o $@ $(h2ws_LTFLAGS) $(h2ws_OBJECTS) $(h2ws_LDADD) diff --git a/test/clients/h2ws.c b/test/clients/h2ws.c new file mode 100644 index 0000000000..a090ac7b09 --- /dev/null +++ b/test/clients/h2ws.c @@ -0,0 +1,1096 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <apr.h> + +#include <assert.h> +#include <inttypes.h> +#include <stdlib.h> +#ifdef APR_HAVE_UNISTD_H +# include <unistd.h> +#endif /* HAVE_UNISTD_H */ +#ifdef APR_HAVE_FCNTL_H +# include <fcntl.h> +#endif /* HAVE_FCNTL_H */ +#include <sys/types.h> +#include <sys/time.h> +#ifdef APR_HAVE_SYS_SOCKET_H +# include <sys/socket.h> +#endif /* HAVE_SYS_SOCKET_H */ +#ifdef APR_HAVE_NETDB_H +# include <netdb.h> +#endif /* HAVE_NETDB_H */ +#ifdef APR_HAVE_NETINET_IN_H +# include <netinet/in.h> +#endif /* HAVE_NETINET_IN_H */ +#include <netinet/tcp.h> +#include <poll.h> +#include <signal.h> +#include <stdio.h> +#include <string.h> +#include <time.h> +#include <errno.h> + +#include <nghttp2/nghttp2.h> + +#define MAKE_NV(NAME, VALUE) \ + { \ + (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \ + NGHTTP2_NV_FLAG_NONE \ + } + +#define MAKE_NV_CS(NAME, VALUE) \ + { \ + (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, strlen(VALUE), \ + NGHTTP2_NV_FLAG_NONE \ + } + + +static int verbose; +static const char *cmd; + +static void log_out(const char *level, const char *where, const char *msg) +{ + struct timespec tp; + struct tm tm; + char timebuf[128]; + + clock_gettime(CLOCK_REALTIME, &tp); + localtime_r(&tp.tv_sec, &tm); + strftime(timebuf, sizeof(timebuf)-1, "%H:%M:%S", &tm); + fprintf(stderr, "[%s.%09lu][%s][%s] %s\n", timebuf, tp.tv_nsec, level, where, msg); +} + +static void log_err(const char *where, const char *msg) +{ + log_out("ERROR", where, msg); +} + +static void log_info(const char *where, const char *msg) +{ + if (verbose) + log_out("INFO", where, msg); +} + +static void log_debug(const char *where, const char *msg) +{ + if (verbose > 1) + log_out("DEBUG", where, msg); +} + +#if defined(__GNUC__) + __attribute__((format(printf, 2, 3))) +#endif +static void log_errf(const char *where, const char *msg, ...) +{ + char buffer[8*1024]; + va_list ap; + + va_start(ap, msg); + vsnprintf(buffer, sizeof(buffer), msg, ap); + va_end(ap); + log_err(where, buffer); +} + +#if defined(__GNUC__) + __attribute__((format(printf, 2, 3))) +#endif +static void log_infof(const char *where, const char *msg, ...) +{ + if (verbose) { + char buffer[8*1024]; + va_list ap; + + va_start(ap, msg); + vsnprintf(buffer, sizeof(buffer), msg, ap); + va_end(ap); + log_info(where, buffer); + } +} + +#if defined(__GNUC__) + __attribute__((format(printf, 2, 3))) +#endif +static void log_debugf(const char *where, const char *msg, ...) +{ + if (verbose > 1) { + char buffer[8*1024]; + va_list ap; + + va_start(ap, msg); + vsnprintf(buffer, sizeof(buffer), msg, ap); + va_end(ap); + log_debug(where, buffer); + } +} + +static int parse_host_port(const char **phost, uint16_t *pport, + int *pipv6, size_t *pconsumed, + const char *s, size_t len, uint16_t def_port) +{ + size_t i, offset; + char *host = NULL; + int port = 0; + int rv = 1, ipv6 = 0; + + if (!len) + goto leave; + offset = 0; + if (s[offset] == '[') { + ipv6 = 1; + for (i = offset++; i < len; ++i) { + if (s[i] == ']') + break; + } + if (i >= len || i == offset) + goto leave; + host = strndup(s + offset, i - offset); + offset = i + 1; + } + else { + for (i = offset; i < len; ++i) { + if (strchr(":/?#", s[i])) + break; + } + if (i == offset) { + log_debugf("parse_uri", "empty host name in '%.*s", (int)len, s); + goto leave; + } + host = strndup(s + offset, i - offset); + offset = i; + } + if (offset < len && s[offset] == ':') { + port = 0; + ++offset; + for (i = offset; i < len; ++i) { + if (strchr("/?#", s[i])) + break; + if (s[i] < '0' || s[i] > '9') { + log_debugf("parse_uri", "invalid port char '%c'", s[i]); + goto leave; + } + port *= 10; + port += s[i] - '0'; + if (port > 65535) { + log_debugf("parse_uri", "invalid port number '%d'", port); + goto leave; + } + } + offset = i; + } + rv = 0; + +leave: + *phost = rv? NULL : host; + *pport = rv? 0 : (port? (uint16_t)port : def_port); + if (pipv6) + *pipv6 = ipv6; + if (pconsumed) + *pconsumed = offset; + return rv; +} + +struct uri { + const char *scheme; + const char *host; + const char *authority; + const char *path; + uint16_t port; + int ipv6; +}; + +static int parse_uri(struct uri *uri, const char *s, size_t len) +{ + char tmp[8192]; + size_t n, offset = 0; + uint16_t def_port = 0; + int rv = 1; + + /* NOT A REAL URI PARSER */ + memset(uri, 0, sizeof(*uri)); + if (len > 5 && !memcmp("ws://", s, 5)) { + uri->scheme = "ws"; + def_port = 80; + offset = 5; + } + else if (len > 6 && !memcmp("wss://", s, 6)) { + uri->scheme = "wss"; + def_port = 443; + offset = 6; + } + else { + /* not a scheme we process */ + goto leave; + } + + if (parse_host_port(&uri->host, &uri->port, &uri->ipv6, &n, s + offset, + len - offset, def_port)) + goto leave; + offset += n; + + if (uri->port == def_port) + uri->authority = uri->host; + else if (uri->ipv6) { + snprintf(tmp, sizeof(tmp), "[%s]:%u", uri->host, uri->port); + uri->authority = strdup(tmp); + } + else { + snprintf(tmp, sizeof(tmp), "%s:%u", uri->host, uri->port); + uri->authority = strdup(tmp); + } + + if (offset < len) { + uri->path = strndup(s + offset, len - offset); + } + rv = 0; + +leave: + return rv; +} + +static int sock_nonblock_nodelay(int fd) { + int flags, rv; + int val = 1; + + while ((flags = fcntl(fd, F_GETFL, 0)) == -1 && errno == EINTR) + ; + if (flags == -1) { + log_errf("sock_nonblock_nodelay", "fcntl get error %d (%s)", + errno, strerror(errno)); + return -1; + } + while ((rv = fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1 && errno == EINTR) + ; + if (rv == -1) { + log_errf("sock_nonblock_nodelay", "fcntl set error %d (%s)", + errno, strerror(errno)); + return -1; + } + rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t)sizeof(val)); + if (rv == -1) { + log_errf("sock_nonblock_nodelay", "set nodelay error %d (%s)", + errno, strerror(errno)); + return -1; + } + return 0; +} + +static int open_connection(const char *host, uint16_t port) +{ + char service[NI_MAXSERV]; + struct addrinfo hints; + struct addrinfo *res = NULL, *rp; + int rv, fd = -1; + + memset(&hints, 0, sizeof(hints)); + snprintf(service, sizeof(service), "%u", port); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + rv = getaddrinfo(host, service, &hints, &res); + if (rv) { + log_err("getaddrinfo", gai_strerror(rv)); + goto leave; + } + + for (rp = res; rp; rp = rp->ai_next) { + fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (fd == -1) { + continue; + } + while ((rv = connect(fd, rp->ai_addr, rp->ai_addrlen)) == -1 && + errno == EINTR) + ; + if (!rv) /* connected */ + break; + close(fd); + fd = -1; + } + +leave: + if (res) + freeaddrinfo(res); + return fd; +} + +struct h2_stream; + +#define IO_WANT_NONE 0 +#define IO_WANT_READ 1 +#define IO_WANT_WRITE 2 + +struct h2_session { + const char *server_name; + const char *connect_host; + uint16_t connect_port; + int fd; + nghttp2_session *ngh2; + struct h2_stream *streams; + int aborted; + int want_io; +}; + +typedef void h2_stream_closed_cb(struct h2_stream *stream); +typedef void h2_stream_recv_data(struct h2_stream *stream, + const uint8_t *data, size_t len); + +struct h2_stream { + struct h2_stream *next; + struct uri *uri; + int32_t id; + int fdin; + int http_status; + uint32_t error_code; + unsigned input_closed : 1; + unsigned closed : 1; + unsigned reset : 1; + h2_stream_closed_cb *on_close; + h2_stream_recv_data *on_recv_data; +}; + +static void h2_session_stream_add(struct h2_session *session, + struct h2_stream *stream) +{ + struct h2_stream *s; + for (s = session->streams; s; s = s->next) { + if (s == stream) /* already there? */ + return; + } + stream->next = session->streams; + session->streams = stream; +} + +static void h2_session_stream_remove(struct h2_session *session, + struct h2_stream *stream) +{ + struct h2_stream *s, **pnext; + pnext = &session->streams; + s = session->streams; + while (s) { + if (s == stream) { + *pnext = s->next; + s->next = NULL; + break; + } + pnext = &s->next; + s = s->next; + } +} + +static struct h2_stream *h2_session_stream_get(struct h2_session *session, + int32_t id) +{ + struct h2_stream *s; + for (s = session->streams; s; s = s->next) { + if (s->id == id) + return s; + } + return NULL; +} + +static ssize_t h2_session_send(nghttp2_session *ngh2, const uint8_t *data, + size_t length, int flags, void *user_data) +{ + struct h2_session *session = user_data; + ssize_t nwritten; + (void)ngh2; + (void)flags; + + session->want_io = IO_WANT_NONE; + nwritten = send(session->fd, data, length, 0); + if (nwritten < 0) { + int err = errno; + if ((EWOULDBLOCK == err) || (EAGAIN == err) || + (EINTR == err) || (EINPROGRESS == err)) { + return NGHTTP2_ERR_WOULDBLOCK; + } + log_errf("h2_session_send", "error sending %ld bytes: %d (%s)", + (long)length, err, strerror(err)); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + return nwritten; +} + +static ssize_t h2_session_recv(nghttp2_session *ngh2, uint8_t *buf, + size_t length, int flags, void *user_data) +{ + struct h2_session *session = user_data; + ssize_t nread; + (void)ngh2; + (void)flags; + + session->want_io = IO_WANT_NONE; + nread = recv(session->fd, buf, length, 0); + if (nread < 0) { + int err = errno; + if ((EWOULDBLOCK == err) || (EAGAIN == err) || (EINTR == err)) { + return NGHTTP2_ERR_WOULDBLOCK; + } + log_errf("h2_session_recv", "error reading %ld bytes: %d (%s)", + (long)length, err, strerror(err)); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + return nread; +} + +static int h2_session_on_frame_send(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + size_t i; + (void)user_data; + + switch (frame->hd.type) { + case NGHTTP2_HEADERS: + if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)) { + const nghttp2_nv *nva = frame->headers.nva; + log_infof("frame send", "FRAME[HEADERS, stream=%d", + frame->hd.stream_id); + for (i = 0; i < frame->headers.nvlen; ++i) { + log_infof("frame send", " %.*s: %.*s", + (int)nva[i].namelen, nva[i].name, + (int)nva[i].valuelen, nva[i].value); + } + log_infof("frame send", "]"); + } + break; + case NGHTTP2_DATA: + log_infof("frame send", "FRAME[DATA, stream=%d, length=%d, flags=%d]", + frame->hd.stream_id, (int)frame->hd.length, + (int)frame->hd.flags); + break; + case NGHTTP2_RST_STREAM: + log_infof("frame send", "FRAME[RST, stream=%d]", + frame->hd.stream_id); + break; + case NGHTTP2_WINDOW_UPDATE: + log_infof("frame send", "FRAME[WINDOW_UPDATE, stream=%d]", + frame->hd.stream_id); + break; + case NGHTTP2_GOAWAY: + log_infof("frame send", "FRAME[GOAWAY]"); + break; + } + return 0; +} + +static int h2_session_on_frame_recv(nghttp2_session *ngh2, + const nghttp2_frame *frame, + void *user_data) +{ + (void)user_data; + + switch (frame->hd.type) { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { + log_infof("frame recv", "FRAME[HEADERS, stream=%d]", + frame->hd.stream_id); + } + break; + case NGHTTP2_DATA: + log_infof("frame recv", "FRAME[DATA, stream=%d, len=%lu, eof=%d]", + frame->hd.stream_id, frame->hd.length, + (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) != 0); + break; + case NGHTTP2_RST_STREAM: + log_infof("frame recv", "FRAME[RST, stream=%d]", + frame->hd.stream_id); + fprintf(stdout, "[%d] RST\n", frame->hd.stream_id); + break; + case NGHTTP2_GOAWAY: + log_infof("frame recv", "FRAME[GOAWAY]"); + break; + } + return 0; +} + +static int h2_session_on_header(nghttp2_session *ngh2, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, size_t valuelen, + uint8_t flags, void *user_data) +{ + struct h2_session *session = user_data; + struct h2_stream *stream; + (void)flags; + (void)user_data; + log_infof("frame recv", "stream=%d, HEADER %.*s: %.*s", + frame->hd.stream_id, (int)namelen, name, + (int)valuelen, value); + stream = h2_session_stream_get(session, frame->hd.stream_id); + if (stream) { + if (namelen == 7 && !strncmp(":status", (const char *)name, namelen)) { + stream->http_status = 0; + if (valuelen < 10) { + char tmp[10], *endp; + memcpy(tmp, value, valuelen); + tmp[valuelen] = 0; + stream->http_status = (int)strtol(tmp, &endp, 10); + } + if (stream->http_status < 100 || stream->http_status >= 600) { + log_errf("on header recv", "stream=%d, invalid :status: %.*s", + frame->hd.stream_id, (int)valuelen, value); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + else { + fprintf(stdout, "[%d] :status: %d\n", stream->id, + stream->http_status); + } + } + } + return 0; +} + +static int h2_session_on_stream_close(nghttp2_session *ngh2, int32_t stream_id, + uint32_t error_code, void *user_data) +{ + struct h2_session *session = user_data; + struct h2_stream *stream; + + stream = h2_session_stream_get(session, stream_id); + if (stream) { + /* closed known stream */ + stream->error_code = error_code; + stream->closed = 1; + if (error_code) + stream->reset = 1; + if (error_code) { + log_errf("stream close", "stream %d closed with error %d", + stream_id, error_code); + } + + h2_session_stream_remove(session, stream); + if (stream->on_close) + stream->on_close(stream); + /* last one? */ + if (!session->streams) { + int rv; + rv = nghttp2_session_terminate_session(ngh2, NGHTTP2_NO_ERROR); + if (rv) { + log_errf("terminate session", "error %d (%s)", + rv, nghttp2_strerror(rv)); + session->aborted = 1; + } + } + } + return 0; +} + +static int h2_session_on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, + int32_t stream_id, const uint8_t *data, + size_t len, void *user_data) { + struct h2_session *session = user_data; + struct h2_stream *stream; + + stream = h2_session_stream_get(session, stream_id); + if (stream && stream->on_recv_data) { + stream->on_recv_data(stream, data, len); + } + return 0; +} + +static int h2_session_open(struct h2_session *session, const char *server_name, + const char *host, uint16_t port) +{ + nghttp2_session_callbacks *cbs = NULL; + int rv = -1; + + memset(session, 0, sizeof(*session)); + session->server_name = server_name; + session->connect_host = host; + session->connect_port = port; + /* establish socket */ + session->fd = open_connection(session->connect_host, session->connect_port); + if (session->fd < 0) { + log_errf(cmd, "could not connect to %s:%u", + session->connect_host, session->connect_port); + goto leave; + } + if (sock_nonblock_nodelay(session->fd)) + goto leave; + session->want_io = IO_WANT_NONE; + + log_infof(cmd, "connected to %s via %s:%u", session->server_name, + session->connect_host, session->connect_port); + + rv = nghttp2_session_callbacks_new(&cbs); + if (rv) { + log_errf("setup callbacks", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + rv = -1; + goto leave; + } + /* setup session callbacks */ + nghttp2_session_callbacks_set_send_callback(cbs, h2_session_send); + nghttp2_session_callbacks_set_recv_callback(cbs, h2_session_recv); + nghttp2_session_callbacks_set_on_frame_send_callback( + cbs, h2_session_on_frame_send); + nghttp2_session_callbacks_set_on_frame_recv_callback( + cbs, h2_session_on_frame_recv); + nghttp2_session_callbacks_set_on_header_callback( + cbs, h2_session_on_header); + nghttp2_session_callbacks_set_on_stream_close_callback( + cbs, h2_session_on_stream_close); + nghttp2_session_callbacks_set_on_data_chunk_recv_callback( + cbs, h2_session_on_data_chunk_recv); + /* create the ngh2 session */ + rv = nghttp2_session_client_new(&session->ngh2, cbs, session); + if (rv) { + log_errf("client new", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + rv = -1; + goto leave; + } + /* submit initial settings */ + rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, NULL, 0); + if (rv) { + log_errf("submit settings", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + rv = -1; + goto leave; + } + rv = 0; + +leave: + if (cbs) + nghttp2_session_callbacks_del(cbs); + return rv; +} + +static int h2_session_io(struct h2_session *session) { + int rv; + rv = nghttp2_session_recv(session->ngh2); + if (rv) { + log_errf("session recv", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + return 1; + } + rv = nghttp2_session_send(session->ngh2); + if (rv) { + log_errf("session send", "error_code=%d, msg=%s\n", rv, + nghttp2_strerror(rv)); + } + return 0; +} + +struct h2_poll_ctx; +typedef int h2_poll_ev_cb(struct h2_poll_ctx *pctx, struct pollfd *pfd); + +struct h2_poll_ctx { + struct h2_session *session; + struct h2_stream *stream; + h2_poll_ev_cb *on_ev; +}; + +static int h2_session_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd) +{ + if (pfd->revents & (POLLIN | POLLOUT)) { + h2_session_io(pctx->session); + } + else if (pfd->revents & POLLHUP) { + log_errf("session run", "connection closed"); + return -1; + } + else if (pfd->revents & POLLERR) { + log_errf("session run", "connection error"); + return -1; + } + return 0; +} + +static int h2_stream_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd) +{ + if (pfd->revents & (POLLIN | POLLHUP)) { + nghttp2_session_resume_data(pctx->session->ngh2, pctx->stream->id); + } + else if (pfd->revents & (POLLERR)) { + nghttp2_submit_rst_stream(pctx->session->ngh2, NGHTTP2_FLAG_NONE, + pctx->stream->id, NGHTTP2_STREAM_CLOSED); + } + return 0; +} + +static nfds_t h2_session_set_poll(struct h2_session *session, + struct h2_poll_ctx *pollctxs, + struct pollfd *pfds) +{ + nfds_t n = 0; + int want_read, want_write; + struct h2_stream *stream; + + want_read = (nghttp2_session_want_read(session->ngh2) || + session->want_io == IO_WANT_READ); + want_write = (nghttp2_session_want_write(session->ngh2) || + session->want_io == IO_WANT_WRITE); + if (want_read || want_write) { + pollctxs[n].session = session; + pollctxs[n].stream = NULL; + pollctxs[n].on_ev = h2_session_ev; + pfds[n].fd = session->fd; + pfds[n].events = pfds[n].revents = 0; + if (want_read) + pfds[n].events |= (POLLIN | POLLHUP); + if (want_write) + pfds[n].events |= (POLLOUT | POLLERR); + ++n; + } + + for (stream = session->streams; stream; stream = stream->next) { + if (stream->fdin >= 0 && !stream->input_closed && !stream->closed) { + pollctxs[n].session = session; + pollctxs[n].stream = stream; + pollctxs[n].on_ev = h2_stream_ev; + pfds[n].fd = stream->fdin; + pfds[n].revents = 0; + pfds[n].events = (POLLIN | POLLHUP); + ++n; + } + } + return n; +} + +static void h2_session_run(struct h2_session *session) +{ + struct h2_poll_ctx pollctxs[5]; + struct pollfd pfds[5]; + nfds_t npollfds, i; + + npollfds = h2_session_set_poll(session, pollctxs, pfds); + while (npollfds) { + if (poll(pfds, npollfds, -1) == -1) { + log_errf("session run", "poll error %d (%s)", errno, strerror(errno)); + break; + } + for (i = 0; i < npollfds; ++i) { + if (pfds[i].revents) { + if (pollctxs[i].on_ev(&pollctxs[i], &pfds[i])) { + break; + } + } + } + npollfds = h2_session_set_poll(session, pollctxs, pfds); + if (!session->streams) + break; + } +} + +static void h2_session_close(struct h2_session *session) +{ + log_infof(cmd, "closed session to %s:%u", + session->connect_host, session->connect_port); +} + +/* websocket stream */ + +struct ws_stream { + struct h2_stream s; +}; + +static void ws_stream_on_close(struct h2_stream *stream) +{ + log_infof("ws stream", "stream %d closed", stream->id); + if (!stream->reset) + fprintf(stdout, "[%d] EOF\n", stream->id); +} + +static void ws_stream_on_recv_data(struct h2_stream *stream, + const uint8_t *data, size_t len) +{ + size_t i; + + log_infof("ws stream", "stream %d recv %lu data bytes", + stream->id, (unsigned long)len); + for (i = 0; i < len; ++i) { + fprintf(stdout, "%s%02x", (i&0xf)? " " : (i? "\n" : ""), data[i]); + } + fprintf(stdout, "\n"); +} + +static int ws_stream_create(struct ws_stream **pstream, struct uri *uri) +{ + struct ws_stream *stream; + + stream = calloc(1, sizeof(*stream)); + if (!stream) { + log_errf("ws stream create", "out of memory"); + *pstream = NULL; + return -1; + } + stream->s.uri = uri; + stream->s.id = -1; + stream->s.on_close = ws_stream_on_close; + stream->s.on_recv_data = ws_stream_on_recv_data; + *pstream = stream; + return 0; +} + +static ssize_t ws_stream_read_req_body(nghttp2_session *ngh2, + int32_t stream_id, + uint8_t *buf, size_t buflen, + uint32_t *pflags, + nghttp2_data_source *source, + void *user_data) +{ + struct h2_session *session = user_data; + struct ws_stream *stream; + ssize_t nread = 0; + int eof = 0; + + stream = (struct ws_stream *)h2_session_stream_get(session, stream_id); + if (!stream) { + log_errf("stream req body", "stream not known"); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + + (void)source; + assert(stream->s.fdin >= 0); + nread = read(stream->s.fdin, buf, buflen); + log_debugf("stream req body", "fread(len=%lu) -> %ld", + (unsigned long)buflen, (long)nread); + + if (nread < 0) { + if (errno == EAGAIN) { + nread = 0; + } + else { + log_errf("stream req body", "error on input"); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } + } + else if (nread == 0) { + eof = 1; + stream->s.input_closed = 1; + } + + *pflags = stream->s.input_closed? NGHTTP2_DATA_FLAG_EOF : 0; + if (nread == 0 && !eof) { + return NGHTTP2_ERR_DEFERRED; + } + return nread; +} + +static int ws_stream_submit(struct ws_stream *stream, + struct h2_session *session, + const nghttp2_nv *nva, size_t nvalen, + int fdin) +{ + nghttp2_data_provider provider, *req_body = NULL; + + if (fdin >= 0) { + sock_nonblock_nodelay(fdin); + stream->s.fdin = fdin; + provider.read_callback = ws_stream_read_req_body; + provider.source.ptr = NULL; + req_body = &provider; + } + else { + stream->s.input_closed = 1; + } + + stream->s.id = nghttp2_submit_request(session->ngh2, NULL, nva, nvalen, + req_body, stream); + if (stream->s.id < 0) { + log_errf("ws stream submit", "nghttp2_submit_request: error %d", + stream->s.id); + return -1; + } + + h2_session_stream_add(session, &stream->s); + log_infof("ws stream submit", "stream %d opened for %s%s", + stream->s.id, stream->s.uri->authority, stream->s.uri->path); + return 0; +} + +static void usage(const char *msg) +{ + if(msg) + fprintf(stderr, "%s\n", msg); + fprintf(stderr, + "usage: [options] ws-uri scenario\n" + " run a websocket scenario to the ws-uri, options:\n" + " -c host:port connect to host:port\n" + " -v increase verbosity\n" + "scenarios are:\n" + " * fail-proto: CONNECT using wrong :protocol\n" + " * miss-authority: CONNECT without :authority header\n" + " * miss-path: CONNECT without :path header\n" + " * miss-scheme: CONNECT without :scheme header\n" + " * miss-version: CONNECT without sec-webSocket-version header\n" + " * ws-empty: open valid websocket, do not send anything\n" + ); +} + +int main(int argc, char *argv[]) +{ + const char *host = NULL, *scenario; + uint16_t port = 80; + struct uri uri; + struct h2_session session; + struct ws_stream *stream; + char ch; + + cmd = argv[0]; + while((ch = getopt(argc, argv, "c:vh")) != -1) { + switch(ch) { + case 'c': + if (parse_host_port(&host, &port, NULL, NULL, + optarg, strlen(optarg), 80)) { + log_errf(cmd, "could not parse connect '%s'", optarg); + return 1; + } + break; + case 'h': + usage(NULL); + return 2; + break; + case 'v': + ++verbose; + break; + default: + usage("invalid option"); + return 1; + } + } + argc -= optind; + argv += optind; + + if (argc < 1) { + usage("need URL"); + return 1; + } + if (argc < 2) { + usage("need scenario"); + return 1; + } + if (parse_uri(&uri, argv[0], strlen(argv[0]))) { + log_errf(cmd, "could not parse uri '%s'", argv[0]); + return 1; + } + log_debugf(cmd, "normalized uri: %s://%s:%u%s", uri.scheme, uri.host, + uri.port, uri.path? uri.path : ""); + scenario = argv[1]; + + if (!host) { + host = uri.host; + port = uri.port; + } + + if (h2_session_open(&session, uri.host, host, port)) + return 1; + + if (ws_stream_create(&stream, &uri)) + return 1; + + if (!strcmp(scenario, "ws-stdin")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), 0)) + return 1; + } + else if (!strcmp(scenario, "fail-proto")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websockets"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-version")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-path")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-scheme")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":authority", stream->s.uri->authority), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else if (!strcmp(scenario, "miss-authority")) { + const nghttp2_nv nva[] = { + MAKE_NV(":method", "CONNECT"), + MAKE_NV_CS(":path", stream->s.uri->path), + MAKE_NV_CS(":scheme", "http"), + MAKE_NV_CS(":protocol", "websocket"), + MAKE_NV("accept", "*/*"), + MAKE_NV("user-agent", "mod_h2/h2ws-test"), + MAKE_NV("sec-webSocket-version", "13"), + MAKE_NV("sec-webSocket-protocol", "chat"), + }; + if (ws_stream_submit(stream, &session, + nva, sizeof(nva) / sizeof(nva[0]), -1)) + return 1; + } + else { + log_errf(cmd, "unknown scenario: %s", scenario); + return 1; + } + + h2_session_run(&session); + h2_session_close(&session); + return 0; +} diff --git a/test/modules/http2/test_800_websockets.py b/test/modules/http2/test_800_websockets.py new file mode 100644 index 0000000000..58ac4eb4e3 --- /dev/null +++ b/test/modules/http2/test_800_websockets.py @@ -0,0 +1,306 @@ +import inspect +import logging +import os +import shutil +import subprocess +import time +from datetime import timedelta, datetime +from typing import Tuple, Union, List +import packaging.version + +import pytest +import websockets +from pyhttpd.result import ExecResult +from pyhttpd.ws_util import WsFrameReader, WsFrame + +from .env import H2Conf, H2TestEnv + + +log = logging.getLogger(__name__) + +ws_version = packaging.version.parse(websockets.version.version) +ws_version_min = packaging.version.Version('10.4') + + +def ws_run(env: H2TestEnv, path, do_input=None, + inbytes=None, send_close=True, + timeout=5, scenario='ws-stdin', + wait_close: float = 0.0) -> Tuple[ExecResult, List[str], Union[List[WsFrame], bytes]]: + """ Run the h2ws test client in various scenarios with given input and + timings. + :param env: the test environment + :param path: the path on the Apache server to CONNECt to + :param do_input: a Callable for sending input to h2ws + :param inbytes: fixed bytes to send to h2ws, unless do_input is given + :param send_close: send a CLOSE WebSockets frame at the end + :param timeout: timeout for waiting on h2ws to finish + :param scenario: name of scenario h2ws should run in + :param wait_close: time to wait before closing input + :return: ExecResult with exit_code/stdout/stderr of run + """ + h2ws = os.path.join(env.clients_dir, 'h2ws') + if not os.path.exists(h2ws): + pytest.fail(f'test client not build: {h2ws}') + args = [ + h2ws, '-vv', '-c', f'localhost:{env.http_port}', + f'ws://cgi.{env.http_tld}:{env.http_port}{path}', + scenario + ] + # we write all output to files, because we manipulate input timings + # and would run in deadlock situations with h2ws blocking operations + # because its output is not consumed + with open(f'{env.gen_dir}/h2ws.stdout', 'w') as fdout: + with open(f'{env.gen_dir}/h2ws.stderr', 'w') as fderr: + proc = subprocess.Popen(args=args, stdin=subprocess.PIPE, + stdout=fdout, stderr=fderr) + if do_input is not None: + do_input(proc) + elif inbytes is not None: + proc.stdin.write(inbytes) + proc.stdin.flush() + + if wait_close > 0: + time.sleep(wait_close) + try: + inbytes = WsFrame.client_close(code=1000).to_network() if send_close else None + proc.communicate(input=inbytes, timeout=timeout) + except subprocess.TimeoutExpired: + log.error(f'ws_run: timeout expired') + proc.kill() + proc.communicate(timeout=timeout) + lines = open(f'{env.gen_dir}/h2ws.stdout').read().splitlines() + infos = [line for line in lines if line.startswith('[1] ')] + hex_content = ' '.join([line for line in lines if not line.startswith('[1] ')]) + if len(infos) > 0 and infos[0] == '[1] :status: 200': + frames = WsFrameReader.parse(bytearray.fromhex(hex_content)) + else: + frames = bytearray.fromhex(hex_content) + return ExecResult(args=args, exit_code=proc.returncode, + stdout=b'', stderr=b''), infos, frames + + +@pytest.mark.skipif(condition=H2TestEnv.is_unsupported, reason="mod_http2 not supported here") +@pytest.mark.skipif(condition=ws_version < ws_version_min, + reason=f'websockets is {ws_version}, need at least {ws_version_min}') +class TestWebSockets: + + @pytest.fixture(autouse=True, scope='class') + def _class_scope(self, env): + # Apache config that CONNECT proxies a WebSocket server for paths starting + # with '/ws/' + # The WebSocket server is started in pytest fixture 'ws_server' below. + conf = H2Conf(env, extras={ + f'cgi.{env.http_tld}': [ + f' H2WebSockets on', + f' ProxyPass /ws/ http://127.0.0.1:{env.ws_port}/ \\', + f' upgrade=websocket timeout=10', + ] + }) + conf.add_vhost_cgi(proxy_self=True, h2proxy_self=True).install() + assert env.apache_restart() == 0 + + def ws_check_alive(self, env, timeout=5): + url = f'http://localhost:{env.ws_port}/' + end = datetime.now() + timedelta(seconds=timeout) + while datetime.now() < end: + r = env.curl_get(url, 5) + if r.exit_code == 0: + return True + time.sleep(.1) + return False + + def _mkpath(self, path): + if not os.path.exists(path): + return os.makedirs(path) + + def _rmrf(self, path): + if os.path.exists(path): + return shutil.rmtree(path) + + @pytest.fixture(autouse=True, scope='class') + def ws_server(self, env): + # Run our python websockets server that has some special behaviour + # for the different path to CONNECT to. + run_dir = os.path.join(env.gen_dir, 'ws-server') + err_file = os.path.join(run_dir, 'stderr') + self._rmrf(run_dir) + self._mkpath(run_dir) + with open(err_file, 'w') as cerr: + cmd = os.path.join(os.path.dirname(inspect.getfile(TestWebSockets)), + 'ws_server.py') + args = ['python3', cmd, '--port', str(env.ws_port)] + p = subprocess.Popen(args=args, cwd=run_dir, stderr=cerr, + stdout=cerr) + if not self.ws_check_alive(env): + p.kill() + p.wait() + pytest.fail(f'ws_server did not start. stderr={open(err_file).readlines()}') + yield + p.terminate() + + # a correct CONNECT, send CLOSE, expect CLOSE, basic success + def test_h2_800_01_ws_empty(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/echo/') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) == 1, f'{frames}' + assert frames[0].opcode == WsFrame.CLOSE, f'{frames}' + + # CONNECT with invalid :protocol header, must fail + def test_h2_800_02_fail_proto(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/echo/', scenario='fail-proto') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}' + + # CONNECT to a URL path that does not exist on the server + def test_h2_800_03_not_found(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/does-not-exist') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 404', '[1] EOF'], f'{r}' + + # CONNECT to a URL path that is a normal HTTP file resource + # we do not want to receive the body of that + def test_h2_800_04_non_ws_resource(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/alive.json') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}' + assert frames == b'' + + # CONNECT to a URL path that sends a delayed HTTP response body + # we do not want to receive the body of that + def test_h2_800_05_non_ws_delay_resource(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/h2test/error?body_delay=100ms') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}' + assert frames == b'' + + # CONNECT missing the sec-webSocket-version header + def test_h2_800_06_miss_version(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-version') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}' + + # CONNECT missing the :path header + def test_h2_800_07_miss_path(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-path') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] RST'], f'{r}' + + # CONNECT missing the :scheme header + def test_h2_800_08_miss_scheme(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-scheme') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] RST'], f'{r}' + + # CONNECT missing the :authority header + def test_h2_800_09_miss_authority(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-authority') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] RST'], f'{r}' + + # CONNECT and exchange a PING + def test_h2_800_10_ws_ping(self, env: H2TestEnv, ws_server): + ping = WsFrame.client_ping(b'12345') + r, infos, frames = ws_run(env, path='/ws/echo/', inbytes=ping.to_network()) + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) == 2, f'{frames}' + assert frames[0].opcode == WsFrame.PONG, f'{frames}' + assert frames[0].data == ping.data, f'{frames}' + assert frames[1].opcode == WsFrame.CLOSE, f'{frames}' + + # CONNECT and send several PINGs with a delay of 200ms + def test_h2_800_11_ws_timed_pings(self, env: H2TestEnv, ws_server): + frame_count = 5 + ping = WsFrame.client_ping(b'12345') + + def do_send(proc): + for _ in range(frame_count): + try: + proc.stdin.write(ping.to_network()) + proc.stdin.flush() + proc.wait(timeout=0.2) + except subprocess.TimeoutExpired: + pass + + r, infos, frames = ws_run(env, path='/ws/echo/', do_input=do_send) + assert r.exit_code == 0 + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) == frame_count + 1, f'{frames}' + assert frames[-1].opcode == WsFrame.CLOSE, f'{frames}' + for i in range(frame_count): + assert frames[i].opcode == WsFrame.PONG, f'{frames}' + assert frames[i].data == ping.data, f'{frames}' + + # CONNECT to path that closes immediately + def test_h2_800_12_ws_unknown(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/unknown') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) == 1, f'{frames}' + # expect a CLOSE with code=4999, reason='path unknown' + assert frames[0].opcode == WsFrame.CLOSE, f'{frames}' + assert frames[0].data[2:].decode() == 'path unknown', f'{frames}' + + # CONNECT to a path that sends us 1 TEXT frame + def test_h2_800_13_ws_text(self, env: H2TestEnv, ws_server): + r, infos, frames = ws_run(env, path='/ws/text/') + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) == 2, f'{frames}' + assert frames[0].opcode == WsFrame.TEXT, f'{frames}' + assert frames[0].data.decode() == 'hello!', f'{frames}' + assert frames[1].opcode == WsFrame.CLOSE, f'{frames}' + + # CONNECT to a path that sends us a named file in BINARY frames + @pytest.mark.parametrize("fname,flen", [ + ("data-1k", 1000), + ("data-10k", 10000), + ("data-100k", 100*1000), + ("data-1m", 1000*1000), + ]) + def test_h2_800_14_ws_file(self, env: H2TestEnv, ws_server, fname, flen): + r, infos, frames = ws_run(env, path=f'/ws/file/{fname}', wait_close=0.5) + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) > 0 + total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY]) + assert total_len == flen, f'{frames}' + + # CONNECT to path with 1MB file and trigger varying BINARY frame lengths + @pytest.mark.parametrize("frame_len", [ + 1000 * 1024, + 100 * 1024, + 10 * 1024, + 1 * 1024, + 512, + ]) + def test_h2_800_15_ws_frame_len(self, env: H2TestEnv, ws_server, frame_len): + fname = "data-1m" + flen = 1000*1000 + r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}', wait_close=0.5) + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) > 0 + total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY]) + assert total_len == flen, f'{frames}' + + # CONNECT to path with 1MB file and trigger delays between BINARY frame writes + @pytest.mark.parametrize("frame_delay", [ + 1, + 10, + 50, + 100, + ]) + def test_h2_800_16_ws_frame_delay(self, env: H2TestEnv, ws_server, frame_delay): + fname = "data-1m" + flen = 1000*1000 + # adjust frame_len to allow for 1 second overall duration + frame_len = int(flen / (1000 / frame_delay)) + r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}/{frame_delay}', + wait_close=1.5) + assert r.exit_code == 0, f'{r}' + assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}' + assert len(frames) > 0 + total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY]) + assert total_len == flen, f'{frames}\n{r}' diff --git a/test/modules/http2/ws_server.py b/test/modules/http2/ws_server.py new file mode 100644 index 0000000000..bcb8d3b094 --- /dev/null +++ b/test/modules/http2/ws_server.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +import argparse +import asyncio +import logging +import os +import sys +import time + +import websockets.server as ws_server +from websockets.exceptions import ConnectionClosedError + +log = logging.getLogger(__name__) + +logging.basicConfig( + format="[%(asctime)s] %(message)s", + level=logging.DEBUG, +) + + +async def echo(websocket): + try: + async for message in websocket: + try: + log.info(f'got request {message}') + except Exception as e: + log.error(f'error {e} getting path from {message}') + await websocket.send(message) + except ConnectionClosedError: + pass + + +async def on_async_conn(conn): + rpath = str(conn.path) + pcomps = rpath[1:].split('/') + if len(pcomps) == 0: + pcomps = ['echo'] # default handler + log.info(f'connection for {pcomps}') + if pcomps[0] == 'echo': + log.info(f'/echo endpoint') + for message in await conn.recv(): + await conn.send(message) + elif pcomps[0] == 'text': + await conn.send('hello!') + elif pcomps[0] == 'file': + if len(pcomps) < 2: + conn.close(code=4999, reason='unknown file') + return + fpath = os.path.join('../', pcomps[1]) + if not os.path.exists(fpath): + conn.close(code=4999, reason='file not found') + return + bufsize = 0 + if len(pcomps) > 2: + bufsize = int(pcomps[2]) + if bufsize <= 0: + bufsize = 16*1024 + delay_ms = 0 + if len(pcomps) > 3: + delay_ms = int(pcomps[3]) + with open(fpath, 'r+b') as fd: + while True: + buf = fd.read(bufsize) + if buf is None or len(buf) == 0: + break + await conn.send(buf) + if delay_ms > 0: + time.sleep(delay_ms/1000) + else: + log.info(f'unknown endpoint: {rpath}') + await conn.close(code=4999, reason='path unknown') + await conn.close(code=1000, reason='') + + +async def run_server(port): + log.info(f'starting server on port {port}') + async with ws_server.serve(ws_handler=on_async_conn, + host="localhost", port=port): + await asyncio.Future() + + +async def main(): + parser = argparse.ArgumentParser(prog='scorecard', + description="Run a websocket echo server.") + parser.add_argument("--port", type=int, + default=0, help="port to listen on") + args = parser.parse_args() + + if args.port == 0: + sys.stderr.write('need --port\n') + sys.exit(1) + + logging.basicConfig( + format="%(asctime)s %(message)s", + level=logging.DEBUG, + ) + await run_server(args.port) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/test/pyhttpd/config.ini.in b/test/pyhttpd/config.ini.in index e1ae0707ab..3f42248f65 100644 --- a/test/pyhttpd/config.ini.in +++ b/test/pyhttpd/config.ini.in @@ -26,6 +26,7 @@ http_port = 5002 https_port = 5001 proxy_port = 5003 http_port2 = 5004 +ws_port = 5100 http_tld = tests.httpd.apache.org test_dir = @abs_srcdir@ test_src_dir = @abs_srcdir@ diff --git a/test/pyhttpd/env.py b/test/pyhttpd/env.py index 842e369cbc..5111883388 100644 --- a/test/pyhttpd/env.py +++ b/test/pyhttpd/env.py @@ -250,8 +250,10 @@ class HttpdTestEnv: self._http_port2 = int(self.config.get('test', 'http_port2')) self._https_port = int(self.config.get('test', 'https_port')) self._proxy_port = int(self.config.get('test', 'proxy_port')) + self._ws_port = int(self.config.get('test', 'ws_port')) self._http_tld = self.config.get('test', 'http_tld') self._test_dir = self.config.get('test', 'test_dir') + self._clients_dir = os.path.join(os.path.dirname(self._test_dir), 'clients') self._gen_dir = self.config.get('test', 'gen_dir') self._server_dir = os.path.join(self._gen_dir, 'apache') self._server_conf_dir = os.path.join(self._server_dir, "conf") @@ -367,6 +369,10 @@ class HttpdTestEnv: return self._proxy_port @property + def ws_port(self) -> int: + return self._ws_port + + @property def http_tld(self) -> str: return self._http_tld @@ -391,6 +397,10 @@ class HttpdTestEnv: return self._test_dir @property + def clients_dir(self) -> str: + return self._clients_dir + + @property def server_dir(self) -> str: return self._server_dir @@ -519,12 +529,14 @@ class HttpdTestEnv: if not os.path.exists(path): return os.makedirs(path) - def run(self, args, stdout_list=False, intext=None, debug_log=True): + def run(self, args, stdout_list=False, intext=None, inbytes=None, debug_log=True): if debug_log: log.debug(f"run: {args}") start = datetime.now() + if intext is not None: + inbytes = intext.encode() p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, - input=intext.encode() if intext else None) + input=inbytes) stdout_as_list = None if stdout_list: try: diff --git a/test/pyhttpd/ws_util.py b/test/pyhttpd/ws_util.py new file mode 100644 index 0000000000..38a3cf7bf4 --- /dev/null +++ b/test/pyhttpd/ws_util.py @@ -0,0 +1,137 @@ +import logging +import struct + + +log = logging.getLogger(__name__) + + +class WsFrame: + + CONT = 0 + TEXT = 1 + BINARY = 2 + RSVD3 = 3 + RSVD4 = 4 + RSVD5 = 5 + RSVD6 = 6 + RSVD7 = 7 + CLOSE = 8 + PING = 9 + PONG = 10 + RSVD11 = 11 + RSVD12 = 12 + RSVD13 = 13 + RSVD14 = 14 + RSVD15 = 15 + + OP_NAMES = [ + "CONT", + "TEXT", + "BINARY", + "RSVD3", + "RSVD4", + "RSVD5", + "RSVD6", + "RSVD7", + "CLOSE", + "PING", + "PONG", + "RSVD11", + "RSVD12", + "RSVD13", + "RSVD14", + "RSVD15", + ] + + def __init__(self, opcode: int, fin: bool, mask: bytes, data: bytes): + self.opcode = opcode + self.fin = fin + self.mask = mask + self.data = data + self.length = len(data) + + def __repr__(self): + return f'WsFrame[{self.OP_NAMES[self.opcode]} fin={self.fin}, mask={self.mask}, len={len(self.data)}]' + + @property + def data_len(self) -> int: + return len(self.data) if self.data else 0 + + def to_network(self) -> bytes: + nd = bytearray() + h1 = self.opcode + if self.fin: + h1 |= 0x80 + nd.extend(struct.pack("!B", h1)) + mask_bit = 0x80 if self.mask is not None else 0x0 + h2 = self.data_len + if h2 > 65535: + nd.extend(struct.pack("!BQ", 127|mask_bit, h2)) + elif h2 > 126: + nd.extend(struct.pack("!BH", 126|mask_bit, h2)) + else: + nd.extend(struct.pack("!B", h2|mask_bit)) + if self.mask is not None: + nd.extend(self.mask) + if self.data is not None: + nd.extend(self.data) + return nd + + @classmethod + def client_ping(cls, data: bytes, mask: bytes = None) -> 'WsFrame': + if mask is None: + mask = bytes.fromhex('00 00 00 00') + return WsFrame(opcode=WsFrame.PING, fin=True, mask=mask, data=data) + + @classmethod + def client_close(cls, code: int, reason: str = None, + mask: bytes = None) -> 'WsFrame': + data = bytearray(struct.pack("!H", code)) + if reason is not None: + data.extend(reason.encode()) + if mask is None: + mask = bytes.fromhex('00 00 00 00') + return WsFrame(opcode=WsFrame.CLOSE, fin=True, mask=mask, data=data) + + +class WsFrameReader: + + def __init__(self, data: bytes): + self.data = data + + def _read(self, n: int): + if len(self.data) < n: + raise EOFError(f'have {len(self.data)} bytes left, but {n} requested') + elif n == 0: + return b'' + chunk = self.data[:n] + del self.data[:n] + return chunk + + def next_frame(self): + data = self._read(2) + h1, h2 = struct.unpack("!BB", data) + log.debug(f'parsed h1={h1} h2={h2} from {data}') + fin = True if h1 & 0x80 else False + opcode = h1 & 0xf + has_mask = True if h2 & 0x80 else False + mask = None + dlen = h2 & 0x7f + if dlen == 126: + (dlen,) = struct.unpack("!H", self._read(2)) + elif dlen == 127: + (dlen,) = struct.unpack("!Q", self._read(8)) + if has_mask: + mask = self._read(4) + return WsFrame(opcode=opcode, fin=fin, mask=mask, data=self._read(dlen)) + + def eof(self): + return len(self.data) == 0 + + @classmethod + def parse(cls, data: bytes): + frames = [] + reader = WsFrameReader(data=data) + while not reader.eof(): + frames.append(reader.next_frame()) + return frames diff --git a/test/travis_run_linux.sh b/test/travis_run_linux.sh index 3a2cdbd009..39c331da5c 100755 --- a/test/travis_run_linux.sh +++ b/test/travis_run_linux.sh @@ -221,6 +221,8 @@ if ! test -v SKIP_TESTING; then fi if test -v TEST_H2 -a $RV -eq 0; then + # Build the test clients + (cd test/clients && make) # Run HTTP/2 tests. MPM=event py.test-3 test/modules/http2 RV=$? |