summaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/clients/.gitignore1
-rw-r--r--test/clients/Makefile.in20
-rw-r--r--test/clients/h2ws.c1096
-rw-r--r--test/modules/http2/test_800_websockets.py306
-rw-r--r--test/modules/http2/ws_server.py100
-rw-r--r--test/pyhttpd/config.ini.in1
-rw-r--r--test/pyhttpd/env.py16
-rw-r--r--test/pyhttpd/ws_util.py137
-rwxr-xr-xtest/travis_run_linux.sh2
9 files changed, 1677 insertions, 2 deletions
diff --git a/test/clients/.gitignore b/test/clients/.gitignore
new file mode 100644
index 0000000000..18b126304c
--- /dev/null
+++ b/test/clients/.gitignore
@@ -0,0 +1 @@
+h2ws \ No newline at end of file
diff --git a/test/clients/Makefile.in b/test/clients/Makefile.in
new file mode 100644
index 0000000000..a322a58416
--- /dev/null
+++ b/test/clients/Makefile.in
@@ -0,0 +1,20 @@
+DISTCLEAN_TARGETS = h2ws
+
+CLEAN_TARGETS = h2ws
+
+bin_PROGRAMS = h2ws
+TARGETS = $(bin_PROGRAMS)
+
+PROGRAM_LDADD = $(UTIL_LDFLAGS) $(PROGRAM_DEPENDENCIES) $(EXTRA_LIBS) $(AP_LIBS)
+PROGRAM_DEPENDENCIES =
+
+include $(top_builddir)/build/rules.mk
+
+h2ws.lo: h2ws.c
+ $(LIBTOOL) --mode=compile $(CC) $(ab_CFLAGS) $(ALL_CFLAGS) $(ALL_CPPFLAGS) \
+ $(ALL_INCLUDES) $(PICFLAGS) $(LTCFLAGS) -c $< && touch $@
+h2ws_OBJECTS = h2ws.lo
+h2ws_LDADD = -lnghttp2
+h2ws: $(h2ws_OBJECTS)
+ $(LIBTOOL) --mode=link $(CC) $(ALL_CFLAGS) $(PILDFLAGS) \
+ $(LT_LDFLAGS) $(ALL_LDFLAGS) -o $@ $(h2ws_LTFLAGS) $(h2ws_OBJECTS) $(h2ws_LDADD)
diff --git a/test/clients/h2ws.c b/test/clients/h2ws.c
new file mode 100644
index 0000000000..a090ac7b09
--- /dev/null
+++ b/test/clients/h2ws.c
@@ -0,0 +1,1096 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr.h>
+
+#include <assert.h>
+#include <inttypes.h>
+#include <stdlib.h>
+#ifdef APR_HAVE_UNISTD_H
+# include <unistd.h>
+#endif /* HAVE_UNISTD_H */
+#ifdef APR_HAVE_FCNTL_H
+# include <fcntl.h>
+#endif /* HAVE_FCNTL_H */
+#include <sys/types.h>
+#include <sys/time.h>
+#ifdef APR_HAVE_SYS_SOCKET_H
+# include <sys/socket.h>
+#endif /* HAVE_SYS_SOCKET_H */
+#ifdef APR_HAVE_NETDB_H
+# include <netdb.h>
+#endif /* HAVE_NETDB_H */
+#ifdef APR_HAVE_NETINET_IN_H
+# include <netinet/in.h>
+#endif /* HAVE_NETINET_IN_H */
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+
+#include <nghttp2/nghttp2.h>
+
+#define MAKE_NV(NAME, VALUE) \
+ { \
+ (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \
+ NGHTTP2_NV_FLAG_NONE \
+ }
+
+#define MAKE_NV_CS(NAME, VALUE) \
+ { \
+ (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, strlen(VALUE), \
+ NGHTTP2_NV_FLAG_NONE \
+ }
+
+
+static int verbose;
+static const char *cmd;
+
+static void log_out(const char *level, const char *where, const char *msg)
+{
+ struct timespec tp;
+ struct tm tm;
+ char timebuf[128];
+
+ clock_gettime(CLOCK_REALTIME, &tp);
+ localtime_r(&tp.tv_sec, &tm);
+ strftime(timebuf, sizeof(timebuf)-1, "%H:%M:%S", &tm);
+ fprintf(stderr, "[%s.%09lu][%s][%s] %s\n", timebuf, tp.tv_nsec, level, where, msg);
+}
+
+static void log_err(const char *where, const char *msg)
+{
+ log_out("ERROR", where, msg);
+}
+
+static void log_info(const char *where, const char *msg)
+{
+ if (verbose)
+ log_out("INFO", where, msg);
+}
+
+static void log_debug(const char *where, const char *msg)
+{
+ if (verbose > 1)
+ log_out("DEBUG", where, msg);
+}
+
+#if defined(__GNUC__)
+ __attribute__((format(printf, 2, 3)))
+#endif
+static void log_errf(const char *where, const char *msg, ...)
+{
+ char buffer[8*1024];
+ va_list ap;
+
+ va_start(ap, msg);
+ vsnprintf(buffer, sizeof(buffer), msg, ap);
+ va_end(ap);
+ log_err(where, buffer);
+}
+
+#if defined(__GNUC__)
+ __attribute__((format(printf, 2, 3)))
+#endif
+static void log_infof(const char *where, const char *msg, ...)
+{
+ if (verbose) {
+ char buffer[8*1024];
+ va_list ap;
+
+ va_start(ap, msg);
+ vsnprintf(buffer, sizeof(buffer), msg, ap);
+ va_end(ap);
+ log_info(where, buffer);
+ }
+}
+
+#if defined(__GNUC__)
+ __attribute__((format(printf, 2, 3)))
+#endif
+static void log_debugf(const char *where, const char *msg, ...)
+{
+ if (verbose > 1) {
+ char buffer[8*1024];
+ va_list ap;
+
+ va_start(ap, msg);
+ vsnprintf(buffer, sizeof(buffer), msg, ap);
+ va_end(ap);
+ log_debug(where, buffer);
+ }
+}
+
+static int parse_host_port(const char **phost, uint16_t *pport,
+ int *pipv6, size_t *pconsumed,
+ const char *s, size_t len, uint16_t def_port)
+{
+ size_t i, offset;
+ char *host = NULL;
+ int port = 0;
+ int rv = 1, ipv6 = 0;
+
+ if (!len)
+ goto leave;
+ offset = 0;
+ if (s[offset] == '[') {
+ ipv6 = 1;
+ for (i = offset++; i < len; ++i) {
+ if (s[i] == ']')
+ break;
+ }
+ if (i >= len || i == offset)
+ goto leave;
+ host = strndup(s + offset, i - offset);
+ offset = i + 1;
+ }
+ else {
+ for (i = offset; i < len; ++i) {
+ if (strchr(":/?#", s[i]))
+ break;
+ }
+ if (i == offset) {
+ log_debugf("parse_uri", "empty host name in '%.*s", (int)len, s);
+ goto leave;
+ }
+ host = strndup(s + offset, i - offset);
+ offset = i;
+ }
+ if (offset < len && s[offset] == ':') {
+ port = 0;
+ ++offset;
+ for (i = offset; i < len; ++i) {
+ if (strchr("/?#", s[i]))
+ break;
+ if (s[i] < '0' || s[i] > '9') {
+ log_debugf("parse_uri", "invalid port char '%c'", s[i]);
+ goto leave;
+ }
+ port *= 10;
+ port += s[i] - '0';
+ if (port > 65535) {
+ log_debugf("parse_uri", "invalid port number '%d'", port);
+ goto leave;
+ }
+ }
+ offset = i;
+ }
+ rv = 0;
+
+leave:
+ *phost = rv? NULL : host;
+ *pport = rv? 0 : (port? (uint16_t)port : def_port);
+ if (pipv6)
+ *pipv6 = ipv6;
+ if (pconsumed)
+ *pconsumed = offset;
+ return rv;
+}
+
+struct uri {
+ const char *scheme;
+ const char *host;
+ const char *authority;
+ const char *path;
+ uint16_t port;
+ int ipv6;
+};
+
+static int parse_uri(struct uri *uri, const char *s, size_t len)
+{
+ char tmp[8192];
+ size_t n, offset = 0;
+ uint16_t def_port = 0;
+ int rv = 1;
+
+ /* NOT A REAL URI PARSER */
+ memset(uri, 0, sizeof(*uri));
+ if (len > 5 && !memcmp("ws://", s, 5)) {
+ uri->scheme = "ws";
+ def_port = 80;
+ offset = 5;
+ }
+ else if (len > 6 && !memcmp("wss://", s, 6)) {
+ uri->scheme = "wss";
+ def_port = 443;
+ offset = 6;
+ }
+ else {
+ /* not a scheme we process */
+ goto leave;
+ }
+
+ if (parse_host_port(&uri->host, &uri->port, &uri->ipv6, &n, s + offset,
+ len - offset, def_port))
+ goto leave;
+ offset += n;
+
+ if (uri->port == def_port)
+ uri->authority = uri->host;
+ else if (uri->ipv6) {
+ snprintf(tmp, sizeof(tmp), "[%s]:%u", uri->host, uri->port);
+ uri->authority = strdup(tmp);
+ }
+ else {
+ snprintf(tmp, sizeof(tmp), "%s:%u", uri->host, uri->port);
+ uri->authority = strdup(tmp);
+ }
+
+ if (offset < len) {
+ uri->path = strndup(s + offset, len - offset);
+ }
+ rv = 0;
+
+leave:
+ return rv;
+}
+
+static int sock_nonblock_nodelay(int fd) {
+ int flags, rv;
+ int val = 1;
+
+ while ((flags = fcntl(fd, F_GETFL, 0)) == -1 && errno == EINTR)
+ ;
+ if (flags == -1) {
+ log_errf("sock_nonblock_nodelay", "fcntl get error %d (%s)",
+ errno, strerror(errno));
+ return -1;
+ }
+ while ((rv = fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1 && errno == EINTR)
+ ;
+ if (rv == -1) {
+ log_errf("sock_nonblock_nodelay", "fcntl set error %d (%s)",
+ errno, strerror(errno));
+ return -1;
+ }
+ rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t)sizeof(val));
+ if (rv == -1) {
+ log_errf("sock_nonblock_nodelay", "set nodelay error %d (%s)",
+ errno, strerror(errno));
+ return -1;
+ }
+ return 0;
+}
+
+static int open_connection(const char *host, uint16_t port)
+{
+ char service[NI_MAXSERV];
+ struct addrinfo hints;
+ struct addrinfo *res = NULL, *rp;
+ int rv, fd = -1;
+
+ memset(&hints, 0, sizeof(hints));
+ snprintf(service, sizeof(service), "%u", port);
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ rv = getaddrinfo(host, service, &hints, &res);
+ if (rv) {
+ log_err("getaddrinfo", gai_strerror(rv));
+ goto leave;
+ }
+
+ for (rp = res; rp; rp = rp->ai_next) {
+ fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (fd == -1) {
+ continue;
+ }
+ while ((rv = connect(fd, rp->ai_addr, rp->ai_addrlen)) == -1 &&
+ errno == EINTR)
+ ;
+ if (!rv) /* connected */
+ break;
+ close(fd);
+ fd = -1;
+ }
+
+leave:
+ if (res)
+ freeaddrinfo(res);
+ return fd;
+}
+
+struct h2_stream;
+
+#define IO_WANT_NONE 0
+#define IO_WANT_READ 1
+#define IO_WANT_WRITE 2
+
+struct h2_session {
+ const char *server_name;
+ const char *connect_host;
+ uint16_t connect_port;
+ int fd;
+ nghttp2_session *ngh2;
+ struct h2_stream *streams;
+ int aborted;
+ int want_io;
+};
+
+typedef void h2_stream_closed_cb(struct h2_stream *stream);
+typedef void h2_stream_recv_data(struct h2_stream *stream,
+ const uint8_t *data, size_t len);
+
+struct h2_stream {
+ struct h2_stream *next;
+ struct uri *uri;
+ int32_t id;
+ int fdin;
+ int http_status;
+ uint32_t error_code;
+ unsigned input_closed : 1;
+ unsigned closed : 1;
+ unsigned reset : 1;
+ h2_stream_closed_cb *on_close;
+ h2_stream_recv_data *on_recv_data;
+};
+
+static void h2_session_stream_add(struct h2_session *session,
+ struct h2_stream *stream)
+{
+ struct h2_stream *s;
+ for (s = session->streams; s; s = s->next) {
+ if (s == stream) /* already there? */
+ return;
+ }
+ stream->next = session->streams;
+ session->streams = stream;
+}
+
+static void h2_session_stream_remove(struct h2_session *session,
+ struct h2_stream *stream)
+{
+ struct h2_stream *s, **pnext;
+ pnext = &session->streams;
+ s = session->streams;
+ while (s) {
+ if (s == stream) {
+ *pnext = s->next;
+ s->next = NULL;
+ break;
+ }
+ pnext = &s->next;
+ s = s->next;
+ }
+}
+
+static struct h2_stream *h2_session_stream_get(struct h2_session *session,
+ int32_t id)
+{
+ struct h2_stream *s;
+ for (s = session->streams; s; s = s->next) {
+ if (s->id == id)
+ return s;
+ }
+ return NULL;
+}
+
+static ssize_t h2_session_send(nghttp2_session *ngh2, const uint8_t *data,
+ size_t length, int flags, void *user_data)
+{
+ struct h2_session *session = user_data;
+ ssize_t nwritten;
+ (void)ngh2;
+ (void)flags;
+
+ session->want_io = IO_WANT_NONE;
+ nwritten = send(session->fd, data, length, 0);
+ if (nwritten < 0) {
+ int err = errno;
+ if ((EWOULDBLOCK == err) || (EAGAIN == err) ||
+ (EINTR == err) || (EINPROGRESS == err)) {
+ return NGHTTP2_ERR_WOULDBLOCK;
+ }
+ log_errf("h2_session_send", "error sending %ld bytes: %d (%s)",
+ (long)length, err, strerror(err));
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ return nwritten;
+}
+
+static ssize_t h2_session_recv(nghttp2_session *ngh2, uint8_t *buf,
+ size_t length, int flags, void *user_data)
+{
+ struct h2_session *session = user_data;
+ ssize_t nread;
+ (void)ngh2;
+ (void)flags;
+
+ session->want_io = IO_WANT_NONE;
+ nread = recv(session->fd, buf, length, 0);
+ if (nread < 0) {
+ int err = errno;
+ if ((EWOULDBLOCK == err) || (EAGAIN == err) || (EINTR == err)) {
+ return NGHTTP2_ERR_WOULDBLOCK;
+ }
+ log_errf("h2_session_recv", "error reading %ld bytes: %d (%s)",
+ (long)length, err, strerror(err));
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ return nread;
+}
+
+static int h2_session_on_frame_send(nghttp2_session *session,
+ const nghttp2_frame *frame,
+ void *user_data)
+{
+ size_t i;
+ (void)user_data;
+
+ switch (frame->hd.type) {
+ case NGHTTP2_HEADERS:
+ if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)) {
+ const nghttp2_nv *nva = frame->headers.nva;
+ log_infof("frame send", "FRAME[HEADERS, stream=%d",
+ frame->hd.stream_id);
+ for (i = 0; i < frame->headers.nvlen; ++i) {
+ log_infof("frame send", " %.*s: %.*s",
+ (int)nva[i].namelen, nva[i].name,
+ (int)nva[i].valuelen, nva[i].value);
+ }
+ log_infof("frame send", "]");
+ }
+ break;
+ case NGHTTP2_DATA:
+ log_infof("frame send", "FRAME[DATA, stream=%d, length=%d, flags=%d]",
+ frame->hd.stream_id, (int)frame->hd.length,
+ (int)frame->hd.flags);
+ break;
+ case NGHTTP2_RST_STREAM:
+ log_infof("frame send", "FRAME[RST, stream=%d]",
+ frame->hd.stream_id);
+ break;
+ case NGHTTP2_WINDOW_UPDATE:
+ log_infof("frame send", "FRAME[WINDOW_UPDATE, stream=%d]",
+ frame->hd.stream_id);
+ break;
+ case NGHTTP2_GOAWAY:
+ log_infof("frame send", "FRAME[GOAWAY]");
+ break;
+ }
+ return 0;
+}
+
+static int h2_session_on_frame_recv(nghttp2_session *ngh2,
+ const nghttp2_frame *frame,
+ void *user_data)
+{
+ (void)user_data;
+
+ switch (frame->hd.type) {
+ case NGHTTP2_HEADERS:
+ if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
+ log_infof("frame recv", "FRAME[HEADERS, stream=%d]",
+ frame->hd.stream_id);
+ }
+ break;
+ case NGHTTP2_DATA:
+ log_infof("frame recv", "FRAME[DATA, stream=%d, len=%lu, eof=%d]",
+ frame->hd.stream_id, frame->hd.length,
+ (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) != 0);
+ break;
+ case NGHTTP2_RST_STREAM:
+ log_infof("frame recv", "FRAME[RST, stream=%d]",
+ frame->hd.stream_id);
+ fprintf(stdout, "[%d] RST\n", frame->hd.stream_id);
+ break;
+ case NGHTTP2_GOAWAY:
+ log_infof("frame recv", "FRAME[GOAWAY]");
+ break;
+ }
+ return 0;
+}
+
+static int h2_session_on_header(nghttp2_session *ngh2,
+ const nghttp2_frame *frame,
+ const uint8_t *name, size_t namelen,
+ const uint8_t *value, size_t valuelen,
+ uint8_t flags, void *user_data)
+{
+ struct h2_session *session = user_data;
+ struct h2_stream *stream;
+ (void)flags;
+ (void)user_data;
+ log_infof("frame recv", "stream=%d, HEADER %.*s: %.*s",
+ frame->hd.stream_id, (int)namelen, name,
+ (int)valuelen, value);
+ stream = h2_session_stream_get(session, frame->hd.stream_id);
+ if (stream) {
+ if (namelen == 7 && !strncmp(":status", (const char *)name, namelen)) {
+ stream->http_status = 0;
+ if (valuelen < 10) {
+ char tmp[10], *endp;
+ memcpy(tmp, value, valuelen);
+ tmp[valuelen] = 0;
+ stream->http_status = (int)strtol(tmp, &endp, 10);
+ }
+ if (stream->http_status < 100 || stream->http_status >= 600) {
+ log_errf("on header recv", "stream=%d, invalid :status: %.*s",
+ frame->hd.stream_id, (int)valuelen, value);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ else {
+ fprintf(stdout, "[%d] :status: %d\n", stream->id,
+ stream->http_status);
+ }
+ }
+ }
+ return 0;
+}
+
+static int h2_session_on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
+ uint32_t error_code, void *user_data)
+{
+ struct h2_session *session = user_data;
+ struct h2_stream *stream;
+
+ stream = h2_session_stream_get(session, stream_id);
+ if (stream) {
+ /* closed known stream */
+ stream->error_code = error_code;
+ stream->closed = 1;
+ if (error_code)
+ stream->reset = 1;
+ if (error_code) {
+ log_errf("stream close", "stream %d closed with error %d",
+ stream_id, error_code);
+ }
+
+ h2_session_stream_remove(session, stream);
+ if (stream->on_close)
+ stream->on_close(stream);
+ /* last one? */
+ if (!session->streams) {
+ int rv;
+ rv = nghttp2_session_terminate_session(ngh2, NGHTTP2_NO_ERROR);
+ if (rv) {
+ log_errf("terminate session", "error %d (%s)",
+ rv, nghttp2_strerror(rv));
+ session->aborted = 1;
+ }
+ }
+ }
+ return 0;
+}
+
+static int h2_session_on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
+ int32_t stream_id, const uint8_t *data,
+ size_t len, void *user_data) {
+ struct h2_session *session = user_data;
+ struct h2_stream *stream;
+
+ stream = h2_session_stream_get(session, stream_id);
+ if (stream && stream->on_recv_data) {
+ stream->on_recv_data(stream, data, len);
+ }
+ return 0;
+}
+
+static int h2_session_open(struct h2_session *session, const char *server_name,
+ const char *host, uint16_t port)
+{
+ nghttp2_session_callbacks *cbs = NULL;
+ int rv = -1;
+
+ memset(session, 0, sizeof(*session));
+ session->server_name = server_name;
+ session->connect_host = host;
+ session->connect_port = port;
+ /* establish socket */
+ session->fd = open_connection(session->connect_host, session->connect_port);
+ if (session->fd < 0) {
+ log_errf(cmd, "could not connect to %s:%u",
+ session->connect_host, session->connect_port);
+ goto leave;
+ }
+ if (sock_nonblock_nodelay(session->fd))
+ goto leave;
+ session->want_io = IO_WANT_NONE;
+
+ log_infof(cmd, "connected to %s via %s:%u", session->server_name,
+ session->connect_host, session->connect_port);
+
+ rv = nghttp2_session_callbacks_new(&cbs);
+ if (rv) {
+ log_errf("setup callbacks", "error_code=%d, msg=%s\n", rv,
+ nghttp2_strerror(rv));
+ rv = -1;
+ goto leave;
+ }
+ /* setup session callbacks */
+ nghttp2_session_callbacks_set_send_callback(cbs, h2_session_send);
+ nghttp2_session_callbacks_set_recv_callback(cbs, h2_session_recv);
+ nghttp2_session_callbacks_set_on_frame_send_callback(
+ cbs, h2_session_on_frame_send);
+ nghttp2_session_callbacks_set_on_frame_recv_callback(
+ cbs, h2_session_on_frame_recv);
+ nghttp2_session_callbacks_set_on_header_callback(
+ cbs, h2_session_on_header);
+ nghttp2_session_callbacks_set_on_stream_close_callback(
+ cbs, h2_session_on_stream_close);
+ nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
+ cbs, h2_session_on_data_chunk_recv);
+ /* create the ngh2 session */
+ rv = nghttp2_session_client_new(&session->ngh2, cbs, session);
+ if (rv) {
+ log_errf("client new", "error_code=%d, msg=%s\n", rv,
+ nghttp2_strerror(rv));
+ rv = -1;
+ goto leave;
+ }
+ /* submit initial settings */
+ rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, NULL, 0);
+ if (rv) {
+ log_errf("submit settings", "error_code=%d, msg=%s\n", rv,
+ nghttp2_strerror(rv));
+ rv = -1;
+ goto leave;
+ }
+ rv = 0;
+
+leave:
+ if (cbs)
+ nghttp2_session_callbacks_del(cbs);
+ return rv;
+}
+
+static int h2_session_io(struct h2_session *session) {
+ int rv;
+ rv = nghttp2_session_recv(session->ngh2);
+ if (rv) {
+ log_errf("session recv", "error_code=%d, msg=%s\n", rv,
+ nghttp2_strerror(rv));
+ return 1;
+ }
+ rv = nghttp2_session_send(session->ngh2);
+ if (rv) {
+ log_errf("session send", "error_code=%d, msg=%s\n", rv,
+ nghttp2_strerror(rv));
+ }
+ return 0;
+}
+
+struct h2_poll_ctx;
+typedef int h2_poll_ev_cb(struct h2_poll_ctx *pctx, struct pollfd *pfd);
+
+struct h2_poll_ctx {
+ struct h2_session *session;
+ struct h2_stream *stream;
+ h2_poll_ev_cb *on_ev;
+};
+
+static int h2_session_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd)
+{
+ if (pfd->revents & (POLLIN | POLLOUT)) {
+ h2_session_io(pctx->session);
+ }
+ else if (pfd->revents & POLLHUP) {
+ log_errf("session run", "connection closed");
+ return -1;
+ }
+ else if (pfd->revents & POLLERR) {
+ log_errf("session run", "connection error");
+ return -1;
+ }
+ return 0;
+}
+
+static int h2_stream_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd)
+{
+ if (pfd->revents & (POLLIN | POLLHUP)) {
+ nghttp2_session_resume_data(pctx->session->ngh2, pctx->stream->id);
+ }
+ else if (pfd->revents & (POLLERR)) {
+ nghttp2_submit_rst_stream(pctx->session->ngh2, NGHTTP2_FLAG_NONE,
+ pctx->stream->id, NGHTTP2_STREAM_CLOSED);
+ }
+ return 0;
+}
+
+static nfds_t h2_session_set_poll(struct h2_session *session,
+ struct h2_poll_ctx *pollctxs,
+ struct pollfd *pfds)
+{
+ nfds_t n = 0;
+ int want_read, want_write;
+ struct h2_stream *stream;
+
+ want_read = (nghttp2_session_want_read(session->ngh2) ||
+ session->want_io == IO_WANT_READ);
+ want_write = (nghttp2_session_want_write(session->ngh2) ||
+ session->want_io == IO_WANT_WRITE);
+ if (want_read || want_write) {
+ pollctxs[n].session = session;
+ pollctxs[n].stream = NULL;
+ pollctxs[n].on_ev = h2_session_ev;
+ pfds[n].fd = session->fd;
+ pfds[n].events = pfds[n].revents = 0;
+ if (want_read)
+ pfds[n].events |= (POLLIN | POLLHUP);
+ if (want_write)
+ pfds[n].events |= (POLLOUT | POLLERR);
+ ++n;
+ }
+
+ for (stream = session->streams; stream; stream = stream->next) {
+ if (stream->fdin >= 0 && !stream->input_closed && !stream->closed) {
+ pollctxs[n].session = session;
+ pollctxs[n].stream = stream;
+ pollctxs[n].on_ev = h2_stream_ev;
+ pfds[n].fd = stream->fdin;
+ pfds[n].revents = 0;
+ pfds[n].events = (POLLIN | POLLHUP);
+ ++n;
+ }
+ }
+ return n;
+}
+
+static void h2_session_run(struct h2_session *session)
+{
+ struct h2_poll_ctx pollctxs[5];
+ struct pollfd pfds[5];
+ nfds_t npollfds, i;
+
+ npollfds = h2_session_set_poll(session, pollctxs, pfds);
+ while (npollfds) {
+ if (poll(pfds, npollfds, -1) == -1) {
+ log_errf("session run", "poll error %d (%s)", errno, strerror(errno));
+ break;
+ }
+ for (i = 0; i < npollfds; ++i) {
+ if (pfds[i].revents) {
+ if (pollctxs[i].on_ev(&pollctxs[i], &pfds[i])) {
+ break;
+ }
+ }
+ }
+ npollfds = h2_session_set_poll(session, pollctxs, pfds);
+ if (!session->streams)
+ break;
+ }
+}
+
+static void h2_session_close(struct h2_session *session)
+{
+ log_infof(cmd, "closed session to %s:%u",
+ session->connect_host, session->connect_port);
+}
+
+/* websocket stream */
+
+struct ws_stream {
+ struct h2_stream s;
+};
+
+static void ws_stream_on_close(struct h2_stream *stream)
+{
+ log_infof("ws stream", "stream %d closed", stream->id);
+ if (!stream->reset)
+ fprintf(stdout, "[%d] EOF\n", stream->id);
+}
+
+static void ws_stream_on_recv_data(struct h2_stream *stream,
+ const uint8_t *data, size_t len)
+{
+ size_t i;
+
+ log_infof("ws stream", "stream %d recv %lu data bytes",
+ stream->id, (unsigned long)len);
+ for (i = 0; i < len; ++i) {
+ fprintf(stdout, "%s%02x", (i&0xf)? " " : (i? "\n" : ""), data[i]);
+ }
+ fprintf(stdout, "\n");
+}
+
+static int ws_stream_create(struct ws_stream **pstream, struct uri *uri)
+{
+ struct ws_stream *stream;
+
+ stream = calloc(1, sizeof(*stream));
+ if (!stream) {
+ log_errf("ws stream create", "out of memory");
+ *pstream = NULL;
+ return -1;
+ }
+ stream->s.uri = uri;
+ stream->s.id = -1;
+ stream->s.on_close = ws_stream_on_close;
+ stream->s.on_recv_data = ws_stream_on_recv_data;
+ *pstream = stream;
+ return 0;
+}
+
+static ssize_t ws_stream_read_req_body(nghttp2_session *ngh2,
+ int32_t stream_id,
+ uint8_t *buf, size_t buflen,
+ uint32_t *pflags,
+ nghttp2_data_source *source,
+ void *user_data)
+{
+ struct h2_session *session = user_data;
+ struct ws_stream *stream;
+ ssize_t nread = 0;
+ int eof = 0;
+
+ stream = (struct ws_stream *)h2_session_stream_get(session, stream_id);
+ if (!stream) {
+ log_errf("stream req body", "stream not known");
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+
+ (void)source;
+ assert(stream->s.fdin >= 0);
+ nread = read(stream->s.fdin, buf, buflen);
+ log_debugf("stream req body", "fread(len=%lu) -> %ld",
+ (unsigned long)buflen, (long)nread);
+
+ if (nread < 0) {
+ if (errno == EAGAIN) {
+ nread = 0;
+ }
+ else {
+ log_errf("stream req body", "error on input");
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ }
+ else if (nread == 0) {
+ eof = 1;
+ stream->s.input_closed = 1;
+ }
+
+ *pflags = stream->s.input_closed? NGHTTP2_DATA_FLAG_EOF : 0;
+ if (nread == 0 && !eof) {
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ return nread;
+}
+
+static int ws_stream_submit(struct ws_stream *stream,
+ struct h2_session *session,
+ const nghttp2_nv *nva, size_t nvalen,
+ int fdin)
+{
+ nghttp2_data_provider provider, *req_body = NULL;
+
+ if (fdin >= 0) {
+ sock_nonblock_nodelay(fdin);
+ stream->s.fdin = fdin;
+ provider.read_callback = ws_stream_read_req_body;
+ provider.source.ptr = NULL;
+ req_body = &provider;
+ }
+ else {
+ stream->s.input_closed = 1;
+ }
+
+ stream->s.id = nghttp2_submit_request(session->ngh2, NULL, nva, nvalen,
+ req_body, stream);
+ if (stream->s.id < 0) {
+ log_errf("ws stream submit", "nghttp2_submit_request: error %d",
+ stream->s.id);
+ return -1;
+ }
+
+ h2_session_stream_add(session, &stream->s);
+ log_infof("ws stream submit", "stream %d opened for %s%s",
+ stream->s.id, stream->s.uri->authority, stream->s.uri->path);
+ return 0;
+}
+
+static void usage(const char *msg)
+{
+ if(msg)
+ fprintf(stderr, "%s\n", msg);
+ fprintf(stderr,
+ "usage: [options] ws-uri scenario\n"
+ " run a websocket scenario to the ws-uri, options:\n"
+ " -c host:port connect to host:port\n"
+ " -v increase verbosity\n"
+ "scenarios are:\n"
+ " * fail-proto: CONNECT using wrong :protocol\n"
+ " * miss-authority: CONNECT without :authority header\n"
+ " * miss-path: CONNECT without :path header\n"
+ " * miss-scheme: CONNECT without :scheme header\n"
+ " * miss-version: CONNECT without sec-webSocket-version header\n"
+ " * ws-empty: open valid websocket, do not send anything\n"
+ );
+}
+
+int main(int argc, char *argv[])
+{
+ const char *host = NULL, *scenario;
+ uint16_t port = 80;
+ struct uri uri;
+ struct h2_session session;
+ struct ws_stream *stream;
+ char ch;
+
+ cmd = argv[0];
+ while((ch = getopt(argc, argv, "c:vh")) != -1) {
+ switch(ch) {
+ case 'c':
+ if (parse_host_port(&host, &port, NULL, NULL,
+ optarg, strlen(optarg), 80)) {
+ log_errf(cmd, "could not parse connect '%s'", optarg);
+ return 1;
+ }
+ break;
+ case 'h':
+ usage(NULL);
+ return 2;
+ break;
+ case 'v':
+ ++verbose;
+ break;
+ default:
+ usage("invalid option");
+ return 1;
+ }
+ }
+ argc -= optind;
+ argv += optind;
+
+ if (argc < 1) {
+ usage("need URL");
+ return 1;
+ }
+ if (argc < 2) {
+ usage("need scenario");
+ return 1;
+ }
+ if (parse_uri(&uri, argv[0], strlen(argv[0]))) {
+ log_errf(cmd, "could not parse uri '%s'", argv[0]);
+ return 1;
+ }
+ log_debugf(cmd, "normalized uri: %s://%s:%u%s", uri.scheme, uri.host,
+ uri.port, uri.path? uri.path : "");
+ scenario = argv[1];
+
+ if (!host) {
+ host = uri.host;
+ port = uri.port;
+ }
+
+ if (h2_session_open(&session, uri.host, host, port))
+ return 1;
+
+ if (ws_stream_create(&stream, &uri))
+ return 1;
+
+ if (!strcmp(scenario, "ws-stdin")) {
+ const nghttp2_nv nva[] = {
+ MAKE_NV(":method", "CONNECT"),
+ MAKE_NV_CS(":path", stream->s.uri->path),
+ MAKE_NV_CS(":scheme", "http"),
+ MAKE_NV_CS(":authority", stream->s.uri->authority),
+ MAKE_NV_CS(":protocol", "websocket"),
+ MAKE_NV("accept", "*/*"),
+ MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+ MAKE_NV("sec-webSocket-version", "13"),
+ MAKE_NV("sec-webSocket-protocol", "chat"),
+ };
+ if (ws_stream_submit(stream, &session,
+ nva, sizeof(nva) / sizeof(nva[0]), 0))
+ return 1;
+ }
+ else if (!strcmp(scenario, "fail-proto")) {
+ const nghttp2_nv nva[] = {
+ MAKE_NV(":method", "CONNECT"),
+ MAKE_NV_CS(":path", stream->s.uri->path),
+ MAKE_NV_CS(":scheme", "http"),
+ MAKE_NV_CS(":authority", stream->s.uri->authority),
+ MAKE_NV_CS(":protocol", "websockets"),
+ MAKE_NV("accept", "*/*"),
+ MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+ MAKE_NV("sec-webSocket-version", "13"),
+ MAKE_NV("sec-webSocket-protocol", "chat"),
+ };
+ if (ws_stream_submit(stream, &session,
+ nva, sizeof(nva) / sizeof(nva[0]), -1))
+ return 1;
+ }
+ else if (!strcmp(scenario, "miss-version")) {
+ const nghttp2_nv nva[] = {
+ MAKE_NV(":method", "CONNECT"),
+ MAKE_NV_CS(":path", stream->s.uri->path),
+ MAKE_NV_CS(":scheme", "http"),
+ MAKE_NV_CS(":authority", stream->s.uri->authority),
+ MAKE_NV_CS(":protocol", "websocket"),
+ MAKE_NV("accept", "*/*"),
+ MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+ MAKE_NV("sec-webSocket-protocol", "chat"),
+ };
+ if (ws_stream_submit(stream, &session,
+ nva, sizeof(nva) / sizeof(nva[0]), -1))
+ return 1;
+ }
+ else if (!strcmp(scenario, "miss-path")) {
+ const nghttp2_nv nva[] = {
+ MAKE_NV(":method", "CONNECT"),
+ MAKE_NV_CS(":scheme", "http"),
+ MAKE_NV_CS(":authority", stream->s.uri->authority),
+ MAKE_NV_CS(":protocol", "websocket"),
+ MAKE_NV("accept", "*/*"),
+ MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+ MAKE_NV("sec-webSocket-version", "13"),
+ MAKE_NV("sec-webSocket-protocol", "chat"),
+ };
+ if (ws_stream_submit(stream, &session,
+ nva, sizeof(nva) / sizeof(nva[0]), -1))
+ return 1;
+ }
+ else if (!strcmp(scenario, "miss-scheme")) {
+ const nghttp2_nv nva[] = {
+ MAKE_NV(":method", "CONNECT"),
+ MAKE_NV_CS(":path", stream->s.uri->path),
+ MAKE_NV_CS(":authority", stream->s.uri->authority),
+ MAKE_NV_CS(":protocol", "websocket"),
+ MAKE_NV("accept", "*/*"),
+ MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+ MAKE_NV("sec-webSocket-version", "13"),
+ MAKE_NV("sec-webSocket-protocol", "chat"),
+ };
+ if (ws_stream_submit(stream, &session,
+ nva, sizeof(nva) / sizeof(nva[0]), -1))
+ return 1;
+ }
+ else if (!strcmp(scenario, "miss-authority")) {
+ const nghttp2_nv nva[] = {
+ MAKE_NV(":method", "CONNECT"),
+ MAKE_NV_CS(":path", stream->s.uri->path),
+ MAKE_NV_CS(":scheme", "http"),
+ MAKE_NV_CS(":protocol", "websocket"),
+ MAKE_NV("accept", "*/*"),
+ MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+ MAKE_NV("sec-webSocket-version", "13"),
+ MAKE_NV("sec-webSocket-protocol", "chat"),
+ };
+ if (ws_stream_submit(stream, &session,
+ nva, sizeof(nva) / sizeof(nva[0]), -1))
+ return 1;
+ }
+ else {
+ log_errf(cmd, "unknown scenario: %s", scenario);
+ return 1;
+ }
+
+ h2_session_run(&session);
+ h2_session_close(&session);
+ return 0;
+}
diff --git a/test/modules/http2/test_800_websockets.py b/test/modules/http2/test_800_websockets.py
new file mode 100644
index 0000000000..58ac4eb4e3
--- /dev/null
+++ b/test/modules/http2/test_800_websockets.py
@@ -0,0 +1,306 @@
+import inspect
+import logging
+import os
+import shutil
+import subprocess
+import time
+from datetime import timedelta, datetime
+from typing import Tuple, Union, List
+import packaging.version
+
+import pytest
+import websockets
+from pyhttpd.result import ExecResult
+from pyhttpd.ws_util import WsFrameReader, WsFrame
+
+from .env import H2Conf, H2TestEnv
+
+
+log = logging.getLogger(__name__)
+
+ws_version = packaging.version.parse(websockets.version.version)
+ws_version_min = packaging.version.Version('10.4')
+
+
+def ws_run(env: H2TestEnv, path, do_input=None,
+ inbytes=None, send_close=True,
+ timeout=5, scenario='ws-stdin',
+ wait_close: float = 0.0) -> Tuple[ExecResult, List[str], Union[List[WsFrame], bytes]]:
+ """ Run the h2ws test client in various scenarios with given input and
+ timings.
+ :param env: the test environment
+ :param path: the path on the Apache server to CONNECt to
+ :param do_input: a Callable for sending input to h2ws
+ :param inbytes: fixed bytes to send to h2ws, unless do_input is given
+ :param send_close: send a CLOSE WebSockets frame at the end
+ :param timeout: timeout for waiting on h2ws to finish
+ :param scenario: name of scenario h2ws should run in
+ :param wait_close: time to wait before closing input
+ :return: ExecResult with exit_code/stdout/stderr of run
+ """
+ h2ws = os.path.join(env.clients_dir, 'h2ws')
+ if not os.path.exists(h2ws):
+ pytest.fail(f'test client not build: {h2ws}')
+ args = [
+ h2ws, '-vv', '-c', f'localhost:{env.http_port}',
+ f'ws://cgi.{env.http_tld}:{env.http_port}{path}',
+ scenario
+ ]
+ # we write all output to files, because we manipulate input timings
+ # and would run in deadlock situations with h2ws blocking operations
+ # because its output is not consumed
+ with open(f'{env.gen_dir}/h2ws.stdout', 'w') as fdout:
+ with open(f'{env.gen_dir}/h2ws.stderr', 'w') as fderr:
+ proc = subprocess.Popen(args=args, stdin=subprocess.PIPE,
+ stdout=fdout, stderr=fderr)
+ if do_input is not None:
+ do_input(proc)
+ elif inbytes is not None:
+ proc.stdin.write(inbytes)
+ proc.stdin.flush()
+
+ if wait_close > 0:
+ time.sleep(wait_close)
+ try:
+ inbytes = WsFrame.client_close(code=1000).to_network() if send_close else None
+ proc.communicate(input=inbytes, timeout=timeout)
+ except subprocess.TimeoutExpired:
+ log.error(f'ws_run: timeout expired')
+ proc.kill()
+ proc.communicate(timeout=timeout)
+ lines = open(f'{env.gen_dir}/h2ws.stdout').read().splitlines()
+ infos = [line for line in lines if line.startswith('[1] ')]
+ hex_content = ' '.join([line for line in lines if not line.startswith('[1] ')])
+ if len(infos) > 0 and infos[0] == '[1] :status: 200':
+ frames = WsFrameReader.parse(bytearray.fromhex(hex_content))
+ else:
+ frames = bytearray.fromhex(hex_content)
+ return ExecResult(args=args, exit_code=proc.returncode,
+ stdout=b'', stderr=b''), infos, frames
+
+
+@pytest.mark.skipif(condition=H2TestEnv.is_unsupported, reason="mod_http2 not supported here")
+@pytest.mark.skipif(condition=ws_version < ws_version_min,
+ reason=f'websockets is {ws_version}, need at least {ws_version_min}')
+class TestWebSockets:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env):
+ # Apache config that CONNECT proxies a WebSocket server for paths starting
+ # with '/ws/'
+ # The WebSocket server is started in pytest fixture 'ws_server' below.
+ conf = H2Conf(env, extras={
+ f'cgi.{env.http_tld}': [
+ f' H2WebSockets on',
+ f' ProxyPass /ws/ http://127.0.0.1:{env.ws_port}/ \\',
+ f' upgrade=websocket timeout=10',
+ ]
+ })
+ conf.add_vhost_cgi(proxy_self=True, h2proxy_self=True).install()
+ assert env.apache_restart() == 0
+
+ def ws_check_alive(self, env, timeout=5):
+ url = f'http://localhost:{env.ws_port}/'
+ end = datetime.now() + timedelta(seconds=timeout)
+ while datetime.now() < end:
+ r = env.curl_get(url, 5)
+ if r.exit_code == 0:
+ return True
+ time.sleep(.1)
+ return False
+
+ def _mkpath(self, path):
+ if not os.path.exists(path):
+ return os.makedirs(path)
+
+ def _rmrf(self, path):
+ if os.path.exists(path):
+ return shutil.rmtree(path)
+
+ @pytest.fixture(autouse=True, scope='class')
+ def ws_server(self, env):
+ # Run our python websockets server that has some special behaviour
+ # for the different path to CONNECT to.
+ run_dir = os.path.join(env.gen_dir, 'ws-server')
+ err_file = os.path.join(run_dir, 'stderr')
+ self._rmrf(run_dir)
+ self._mkpath(run_dir)
+ with open(err_file, 'w') as cerr:
+ cmd = os.path.join(os.path.dirname(inspect.getfile(TestWebSockets)),
+ 'ws_server.py')
+ args = ['python3', cmd, '--port', str(env.ws_port)]
+ p = subprocess.Popen(args=args, cwd=run_dir, stderr=cerr,
+ stdout=cerr)
+ if not self.ws_check_alive(env):
+ p.kill()
+ p.wait()
+ pytest.fail(f'ws_server did not start. stderr={open(err_file).readlines()}')
+ yield
+ p.terminate()
+
+ # a correct CONNECT, send CLOSE, expect CLOSE, basic success
+ def test_h2_800_01_ws_empty(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 1, f'{frames}'
+ assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+
+ # CONNECT with invalid :protocol header, must fail
+ def test_h2_800_02_fail_proto(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='fail-proto')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}'
+
+ # CONNECT to a URL path that does not exist on the server
+ def test_h2_800_03_not_found(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/does-not-exist')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 404', '[1] EOF'], f'{r}'
+
+ # CONNECT to a URL path that is a normal HTTP file resource
+ # we do not want to receive the body of that
+ def test_h2_800_04_non_ws_resource(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/alive.json')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+ assert frames == b''
+
+ # CONNECT to a URL path that sends a delayed HTTP response body
+ # we do not want to receive the body of that
+ def test_h2_800_05_non_ws_delay_resource(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/h2test/error?body_delay=100ms')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+ assert frames == b''
+
+ # CONNECT missing the sec-webSocket-version header
+ def test_h2_800_06_miss_version(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-version')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}'
+
+ # CONNECT missing the :path header
+ def test_h2_800_07_miss_path(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-path')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] RST'], f'{r}'
+
+ # CONNECT missing the :scheme header
+ def test_h2_800_08_miss_scheme(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-scheme')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] RST'], f'{r}'
+
+ # CONNECT missing the :authority header
+ def test_h2_800_09_miss_authority(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-authority')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] RST'], f'{r}'
+
+ # CONNECT and exchange a PING
+ def test_h2_800_10_ws_ping(self, env: H2TestEnv, ws_server):
+ ping = WsFrame.client_ping(b'12345')
+ r, infos, frames = ws_run(env, path='/ws/echo/', inbytes=ping.to_network())
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 2, f'{frames}'
+ assert frames[0].opcode == WsFrame.PONG, f'{frames}'
+ assert frames[0].data == ping.data, f'{frames}'
+ assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+ # CONNECT and send several PINGs with a delay of 200ms
+ def test_h2_800_11_ws_timed_pings(self, env: H2TestEnv, ws_server):
+ frame_count = 5
+ ping = WsFrame.client_ping(b'12345')
+
+ def do_send(proc):
+ for _ in range(frame_count):
+ try:
+ proc.stdin.write(ping.to_network())
+ proc.stdin.flush()
+ proc.wait(timeout=0.2)
+ except subprocess.TimeoutExpired:
+ pass
+
+ r, infos, frames = ws_run(env, path='/ws/echo/', do_input=do_send)
+ assert r.exit_code == 0
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == frame_count + 1, f'{frames}'
+ assert frames[-1].opcode == WsFrame.CLOSE, f'{frames}'
+ for i in range(frame_count):
+ assert frames[i].opcode == WsFrame.PONG, f'{frames}'
+ assert frames[i].data == ping.data, f'{frames}'
+
+ # CONNECT to path that closes immediately
+ def test_h2_800_12_ws_unknown(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/unknown')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 1, f'{frames}'
+ # expect a CLOSE with code=4999, reason='path unknown'
+ assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+ assert frames[0].data[2:].decode() == 'path unknown', f'{frames}'
+
+ # CONNECT to a path that sends us 1 TEXT frame
+ def test_h2_800_13_ws_text(self, env: H2TestEnv, ws_server):
+ r, infos, frames = ws_run(env, path='/ws/text/')
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) == 2, f'{frames}'
+ assert frames[0].opcode == WsFrame.TEXT, f'{frames}'
+ assert frames[0].data.decode() == 'hello!', f'{frames}'
+ assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+ # CONNECT to a path that sends us a named file in BINARY frames
+ @pytest.mark.parametrize("fname,flen", [
+ ("data-1k", 1000),
+ ("data-10k", 10000),
+ ("data-100k", 100*1000),
+ ("data-1m", 1000*1000),
+ ])
+ def test_h2_800_14_ws_file(self, env: H2TestEnv, ws_server, fname, flen):
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}', wait_close=0.5)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == flen, f'{frames}'
+
+ # CONNECT to path with 1MB file and trigger varying BINARY frame lengths
+ @pytest.mark.parametrize("frame_len", [
+ 1000 * 1024,
+ 100 * 1024,
+ 10 * 1024,
+ 1 * 1024,
+ 512,
+ ])
+ def test_h2_800_15_ws_frame_len(self, env: H2TestEnv, ws_server, frame_len):
+ fname = "data-1m"
+ flen = 1000*1000
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}', wait_close=0.5)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == flen, f'{frames}'
+
+ # CONNECT to path with 1MB file and trigger delays between BINARY frame writes
+ @pytest.mark.parametrize("frame_delay", [
+ 1,
+ 10,
+ 50,
+ 100,
+ ])
+ def test_h2_800_16_ws_frame_delay(self, env: H2TestEnv, ws_server, frame_delay):
+ fname = "data-1m"
+ flen = 1000*1000
+ # adjust frame_len to allow for 1 second overall duration
+ frame_len = int(flen / (1000 / frame_delay))
+ r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}/{frame_delay}',
+ wait_close=1.5)
+ assert r.exit_code == 0, f'{r}'
+ assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+ assert len(frames) > 0
+ total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+ assert total_len == flen, f'{frames}\n{r}'
diff --git a/test/modules/http2/ws_server.py b/test/modules/http2/ws_server.py
new file mode 100644
index 0000000000..bcb8d3b094
--- /dev/null
+++ b/test/modules/http2/ws_server.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python3
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import time
+
+import websockets.server as ws_server
+from websockets.exceptions import ConnectionClosedError
+
+log = logging.getLogger(__name__)
+
+logging.basicConfig(
+ format="[%(asctime)s] %(message)s",
+ level=logging.DEBUG,
+)
+
+
+async def echo(websocket):
+ try:
+ async for message in websocket:
+ try:
+ log.info(f'got request {message}')
+ except Exception as e:
+ log.error(f'error {e} getting path from {message}')
+ await websocket.send(message)
+ except ConnectionClosedError:
+ pass
+
+
+async def on_async_conn(conn):
+ rpath = str(conn.path)
+ pcomps = rpath[1:].split('/')
+ if len(pcomps) == 0:
+ pcomps = ['echo'] # default handler
+ log.info(f'connection for {pcomps}')
+ if pcomps[0] == 'echo':
+ log.info(f'/echo endpoint')
+ for message in await conn.recv():
+ await conn.send(message)
+ elif pcomps[0] == 'text':
+ await conn.send('hello!')
+ elif pcomps[0] == 'file':
+ if len(pcomps) < 2:
+ conn.close(code=4999, reason='unknown file')
+ return
+ fpath = os.path.join('../', pcomps[1])
+ if not os.path.exists(fpath):
+ conn.close(code=4999, reason='file not found')
+ return
+ bufsize = 0
+ if len(pcomps) > 2:
+ bufsize = int(pcomps[2])
+ if bufsize <= 0:
+ bufsize = 16*1024
+ delay_ms = 0
+ if len(pcomps) > 3:
+ delay_ms = int(pcomps[3])
+ with open(fpath, 'r+b') as fd:
+ while True:
+ buf = fd.read(bufsize)
+ if buf is None or len(buf) == 0:
+ break
+ await conn.send(buf)
+ if delay_ms > 0:
+ time.sleep(delay_ms/1000)
+ else:
+ log.info(f'unknown endpoint: {rpath}')
+ await conn.close(code=4999, reason='path unknown')
+ await conn.close(code=1000, reason='')
+
+
+async def run_server(port):
+ log.info(f'starting server on port {port}')
+ async with ws_server.serve(ws_handler=on_async_conn,
+ host="localhost", port=port):
+ await asyncio.Future()
+
+
+async def main():
+ parser = argparse.ArgumentParser(prog='scorecard',
+ description="Run a websocket echo server.")
+ parser.add_argument("--port", type=int,
+ default=0, help="port to listen on")
+ args = parser.parse_args()
+
+ if args.port == 0:
+ sys.stderr.write('need --port\n')
+ sys.exit(1)
+
+ logging.basicConfig(
+ format="%(asctime)s %(message)s",
+ level=logging.DEBUG,
+ )
+ await run_server(args.port)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/test/pyhttpd/config.ini.in b/test/pyhttpd/config.ini.in
index e1ae0707ab..3f42248f65 100644
--- a/test/pyhttpd/config.ini.in
+++ b/test/pyhttpd/config.ini.in
@@ -26,6 +26,7 @@ http_port = 5002
https_port = 5001
proxy_port = 5003
http_port2 = 5004
+ws_port = 5100
http_tld = tests.httpd.apache.org
test_dir = @abs_srcdir@
test_src_dir = @abs_srcdir@
diff --git a/test/pyhttpd/env.py b/test/pyhttpd/env.py
index 842e369cbc..5111883388 100644
--- a/test/pyhttpd/env.py
+++ b/test/pyhttpd/env.py
@@ -250,8 +250,10 @@ class HttpdTestEnv:
self._http_port2 = int(self.config.get('test', 'http_port2'))
self._https_port = int(self.config.get('test', 'https_port'))
self._proxy_port = int(self.config.get('test', 'proxy_port'))
+ self._ws_port = int(self.config.get('test', 'ws_port'))
self._http_tld = self.config.get('test', 'http_tld')
self._test_dir = self.config.get('test', 'test_dir')
+ self._clients_dir = os.path.join(os.path.dirname(self._test_dir), 'clients')
self._gen_dir = self.config.get('test', 'gen_dir')
self._server_dir = os.path.join(self._gen_dir, 'apache')
self._server_conf_dir = os.path.join(self._server_dir, "conf")
@@ -367,6 +369,10 @@ class HttpdTestEnv:
return self._proxy_port
@property
+ def ws_port(self) -> int:
+ return self._ws_port
+
+ @property
def http_tld(self) -> str:
return self._http_tld
@@ -391,6 +397,10 @@ class HttpdTestEnv:
return self._test_dir
@property
+ def clients_dir(self) -> str:
+ return self._clients_dir
+
+ @property
def server_dir(self) -> str:
return self._server_dir
@@ -519,12 +529,14 @@ class HttpdTestEnv:
if not os.path.exists(path):
return os.makedirs(path)
- def run(self, args, stdout_list=False, intext=None, debug_log=True):
+ def run(self, args, stdout_list=False, intext=None, inbytes=None, debug_log=True):
if debug_log:
log.debug(f"run: {args}")
start = datetime.now()
+ if intext is not None:
+ inbytes = intext.encode()
p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
- input=intext.encode() if intext else None)
+ input=inbytes)
stdout_as_list = None
if stdout_list:
try:
diff --git a/test/pyhttpd/ws_util.py b/test/pyhttpd/ws_util.py
new file mode 100644
index 0000000000..38a3cf7bf4
--- /dev/null
+++ b/test/pyhttpd/ws_util.py
@@ -0,0 +1,137 @@
+import logging
+import struct
+
+
+log = logging.getLogger(__name__)
+
+
+class WsFrame:
+
+ CONT = 0
+ TEXT = 1
+ BINARY = 2
+ RSVD3 = 3
+ RSVD4 = 4
+ RSVD5 = 5
+ RSVD6 = 6
+ RSVD7 = 7
+ CLOSE = 8
+ PING = 9
+ PONG = 10
+ RSVD11 = 11
+ RSVD12 = 12
+ RSVD13 = 13
+ RSVD14 = 14
+ RSVD15 = 15
+
+ OP_NAMES = [
+ "CONT",
+ "TEXT",
+ "BINARY",
+ "RSVD3",
+ "RSVD4",
+ "RSVD5",
+ "RSVD6",
+ "RSVD7",
+ "CLOSE",
+ "PING",
+ "PONG",
+ "RSVD11",
+ "RSVD12",
+ "RSVD13",
+ "RSVD14",
+ "RSVD15",
+ ]
+
+ def __init__(self, opcode: int, fin: bool, mask: bytes, data: bytes):
+ self.opcode = opcode
+ self.fin = fin
+ self.mask = mask
+ self.data = data
+ self.length = len(data)
+
+ def __repr__(self):
+ return f'WsFrame[{self.OP_NAMES[self.opcode]} fin={self.fin}, mask={self.mask}, len={len(self.data)}]'
+
+ @property
+ def data_len(self) -> int:
+ return len(self.data) if self.data else 0
+
+ def to_network(self) -> bytes:
+ nd = bytearray()
+ h1 = self.opcode
+ if self.fin:
+ h1 |= 0x80
+ nd.extend(struct.pack("!B", h1))
+ mask_bit = 0x80 if self.mask is not None else 0x0
+ h2 = self.data_len
+ if h2 > 65535:
+ nd.extend(struct.pack("!BQ", 127|mask_bit, h2))
+ elif h2 > 126:
+ nd.extend(struct.pack("!BH", 126|mask_bit, h2))
+ else:
+ nd.extend(struct.pack("!B", h2|mask_bit))
+ if self.mask is not None:
+ nd.extend(self.mask)
+ if self.data is not None:
+ nd.extend(self.data)
+ return nd
+
+ @classmethod
+ def client_ping(cls, data: bytes, mask: bytes = None) -> 'WsFrame':
+ if mask is None:
+ mask = bytes.fromhex('00 00 00 00')
+ return WsFrame(opcode=WsFrame.PING, fin=True, mask=mask, data=data)
+
+ @classmethod
+ def client_close(cls, code: int, reason: str = None,
+ mask: bytes = None) -> 'WsFrame':
+ data = bytearray(struct.pack("!H", code))
+ if reason is not None:
+ data.extend(reason.encode())
+ if mask is None:
+ mask = bytes.fromhex('00 00 00 00')
+ return WsFrame(opcode=WsFrame.CLOSE, fin=True, mask=mask, data=data)
+
+
+class WsFrameReader:
+
+ def __init__(self, data: bytes):
+ self.data = data
+
+ def _read(self, n: int):
+ if len(self.data) < n:
+ raise EOFError(f'have {len(self.data)} bytes left, but {n} requested')
+ elif n == 0:
+ return b''
+ chunk = self.data[:n]
+ del self.data[:n]
+ return chunk
+
+ def next_frame(self):
+ data = self._read(2)
+ h1, h2 = struct.unpack("!BB", data)
+ log.debug(f'parsed h1={h1} h2={h2} from {data}')
+ fin = True if h1 & 0x80 else False
+ opcode = h1 & 0xf
+ has_mask = True if h2 & 0x80 else False
+ mask = None
+ dlen = h2 & 0x7f
+ if dlen == 126:
+ (dlen,) = struct.unpack("!H", self._read(2))
+ elif dlen == 127:
+ (dlen,) = struct.unpack("!Q", self._read(8))
+ if has_mask:
+ mask = self._read(4)
+ return WsFrame(opcode=opcode, fin=fin, mask=mask, data=self._read(dlen))
+
+ def eof(self):
+ return len(self.data) == 0
+
+ @classmethod
+ def parse(cls, data: bytes):
+ frames = []
+ reader = WsFrameReader(data=data)
+ while not reader.eof():
+ frames.append(reader.next_frame())
+ return frames
diff --git a/test/travis_run_linux.sh b/test/travis_run_linux.sh
index 3a2cdbd009..39c331da5c 100755
--- a/test/travis_run_linux.sh
+++ b/test/travis_run_linux.sh
@@ -221,6 +221,8 @@ if ! test -v SKIP_TESTING; then
fi
if test -v TEST_H2 -a $RV -eq 0; then
+ # Build the test clients
+ (cd test/clients && make)
# Run HTTP/2 tests.
MPM=event py.test-3 test/modules/http2
RV=$?