import inspect import logging import os import re import subprocess from typing import Dict, Any from pyhttpd.certs import CertificateSpec from pyhttpd.conf import HttpdConf from pyhttpd.env import HttpdTestEnv, HttpdTestSetup log = logging.getLogger(__name__) class H2TestSetup(HttpdTestSetup): def __init__(self, env: 'HttpdTestEnv'): super().__init__(env=env) self.add_source_dir(os.path.dirname(inspect.getfile(H2TestSetup))) self.add_modules(["http2", "proxy_http2", "cgid", "autoindex", "ssl"]) def make(self): super().make() self._add_h2test() self._setup_data_1k_1m() def _add_h2test(self): local_dir = os.path.dirname(inspect.getfile(H2TestSetup)) p = subprocess.run([self.env.apxs, '-c', 'mod_h2test.c'], capture_output=True, cwd=os.path.join(local_dir, 'mod_h2test')) rv = p.returncode if rv != 0: log.error(f"compiling md_h2test failed: {p.stderr}") raise Exception(f"compiling md_h2test failed: {p.stderr}") modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf') with open(modules_conf, 'a') as fd: # load our test module which is not installed fd.write(f"LoadModule h2test_module \"{local_dir}/mod_h2test/.libs/mod_h2test.so\"\n") def _setup_data_1k_1m(self): s90 = "01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n" with open(os.path.join(self.env.gen_dir, "data-1k"), 'w') as f: for i in range(10): f.write(f"{i:09d}-{s90}") with open(os.path.join(self.env.gen_dir, "data-10k"), 'w') as f: for i in range(100): f.write(f"{i:09d}-{s90}") with open(os.path.join(self.env.gen_dir, "data-100k"), 'w') as f: for i in range(1000): f.write(f"{i:09d}-{s90}") with open(os.path.join(self.env.gen_dir, "data-1m"), 'w') as f: for i in range(10000): f.write(f"{i:09d}-{s90}") class H2TestEnv(HttpdTestEnv): @classmethod @property def is_unsupported(cls): mpm_module = f"mpm_{os.environ['MPM']}" if 'MPM' in os.environ else 'mpm_event' return mpm_module == 'mpm_prefork' def __init__(self, pytestconfig=None): super().__init__(pytestconfig=pytestconfig) self.add_httpd_conf([ "H2MinWorkers 1", "H2MaxWorkers 64", "Protocols h2 http/1.1 h2c", ]) self.add_httpd_log_modules(["http2", "proxy_http2", "h2test", "proxy", "proxy_http"]) self.add_cert_specs([ CertificateSpec(domains=[ f"push.{self._http_tld}", f"hints.{self._http_tld}", f"ssl.{self._http_tld}", f"pad0.{self._http_tld}", f"pad1.{self._http_tld}", f"pad2.{self._http_tld}", f"pad3.{self._http_tld}", f"pad8.{self._http_tld}", ]), CertificateSpec(domains=[f"noh2.{self.http_tld}"], key_type='rsa2048'), ]) self.httpd_error_log.set_ignored_lognos([ 'AH02032', 'AH01276', 'AH01630', 'AH00135', 'AH02261', # Re-negotiation handshake failed (our test_101) 'AH03490', # scoreboard full, happens on limit tests 'AH02429', # invalid chars in response header names, see test_h2_200 'AH02430', # invalid chars in response header values, see test_h2_200 'AH10373', # SSL errors on uncompleted handshakes, see test_h2_105 ]) self.httpd_error_log.add_ignored_patterns([ re.compile(r'.*malformed header from script \'hecho.py\': Bad header: x.*'), re.compile(r'.*:tls_post_process_client_hello:.*'), # OSSL 3 dropped the function name from the error description. Use the code instead: # 0A0000C1 = no shared cipher -- Too restrictive SSLCipherSuite or using DSA server certificate? re.compile(r'.*SSL Library Error: error:0A0000C1:.*'), re.compile(r'.*:tls_process_client_certificate:.*'), # OSSL 3 dropped the function name from the error description. Use the code instead: # 0A0000C7 = peer did not return a certificate -- No CAs known to server for verification? re.compile(r'.*SSL Library Error: error:0A0000C7:.*'), re.compile(r'.*have incompatible TLS configurations.'), ]) def setup_httpd(self, setup: HttpdTestSetup = None): super().setup_httpd(setup=H2TestSetup(env=self)) class H2Conf(HttpdConf): def __init__(self, env: HttpdTestEnv, extras: Dict[str, Any] = None): super().__init__(env=env, extras=HttpdConf.merge_extras(extras, { f"cgi.{env.http_tld}": [ "SSLOptions +StdEnvVars", "AddHandler cgi-script .py", "", " SetHandler h2test-echo", "", "", " SetHandler h2test-delay", "", ] })) def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None, ssl_module=None, with_certificates=None): super().start_vhost(domains=domains, port=port, doc_root=doc_root, with_ssl=with_ssl, ssl_module=ssl_module, with_certificates=with_certificates) if f"noh2.{self.env.http_tld}" in domains: protos = ["http/1.1"] elif port == self.env.https_port or with_ssl is True: protos = ["h2", "http/1.1"] else: protos = ["h2c", "http/1.1"] if f"test2.{self.env.http_tld}" in domains: protos = reversed(protos) self.add(f"Protocols {' '.join(protos)}") return self def add_vhost_noh2(self): domains = [f"noh2.{self.env.http_tld}", f"noh2-alias.{self.env.http_tld}"] self.start_vhost(domains=domains, port=self.env.https_port, doc_root="htdocs/noh2") self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"]) self.end_vhost() self.start_vhost(domains=domains, port=self.env.http_port, doc_root="htdocs/noh2") self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"]) self.end_vhost() return self def add_vhost_test1(self, proxy_self=False, h2proxy_self=False): return super().add_vhost_test1(proxy_self=proxy_self, h2proxy_self=h2proxy_self) def add_vhost_test2(self): return super().add_vhost_test2()