# test driving the ACMEv2 protocol import base64 import json import os.path import re from datetime import timedelta import pytest from pyhttpd.certs import CertificateSpec from .md_conf import MDConf from .md_cert_util import MDCertUtil from .md_env import MDTestEnv @pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available") @pytest.mark.skipif(condition=not MDTestEnv.has_acme_server(), reason="no ACME test server configured") class TestDrivev2: @pytest.fixture(autouse=True, scope='class') def _class_scope(self, env, acme): acme.start(config='default') env.check_acme() env.APACHE_CONF_SRC = "data/test_drive" MDConf(env).install() assert env.apache_restart() == 0 @pytest.fixture(autouse=True, scope='function') def _method_scope(self, env, request): env.clear_store() MDConf(env).install() self.test_domain = env.get_request_domain(request) # --------- invalid precondition --------- def test_md_502_000(self, env): # test case: md without contact info domain = self.test_domain name = "www." + domain assert env.a2md(["add", name]).exit_code == 0 r = env.a2md(["drive", name]) assert r.exit_code == 1 assert re.search("No contact information", r.stderr) def test_md_502_001(self, env): # test case: md with contact, but without TOS domain = self.test_domain name = "www." + domain assert env.a2md(["add", name]).exit_code == 0 assert env.a2md( ["update", name, "contacts", "admin@test1.not-forbidden.org"] ).exit_code == 0 r = env.a2md(["drive", name]) assert r.exit_code == 1 assert re.search("the CA requires you to accept the terms-of-service as specified in ", r.stderr) # test_102 removed, was based on false assumption def test_md_502_003(self, env): # test case: md with unknown protocol FOO domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) assert env.a2md( ["update", name, "ca", env.acme_url, "FOO"] ).exit_code == 0 r = env.a2md(["drive", name]) assert r.exit_code == 1 assert re.search("Unknown CA protocol", r.stderr) # --------- driving OK --------- def test_md_502_100(self, env): # test case: md with one domain domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) assert env.apache_restart() == 0 # drive prev_md = env.a2md(["list", name]).json['output'][0] r = env.a2md(["-vv", "drive", "-c", "http-01", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) env.check_md_credentials([name]) self._check_account_key(env, name) # check archive content store_md = json.loads(open(env.store_archived_file(name, 1, 'md.json')).read()) for f in ['name', 'ca', 'domains', 'contacts', 'renew-mode', 'renew-window', 'must-staple']: assert store_md[f] == prev_md[f] # check file system permissions: env.check_file_permissions(name) # check: challenges removed env.check_dir_empty(env.store_challenges()) # check how the challenge resources are answered in sevceral combinations r = env.get_meta(domain, "/.well-known/acme-challenge", False) assert r.exit_code == 0 assert r.response['status'] == 404 r = env.get_meta(domain, "/.well-known/acme-challenge/", False) assert r.exit_code == 0 assert r.response['status'] == 404 r = env.get_meta(domain, "/.well-known/acme-challenge/123", False) assert r.exit_code == 0 assert r.response['status'] == 404 assert r.exit_code == 0 cdir = os.path.join(env.store_challenges(), domain) os.makedirs(cdir) open(os.path.join(cdir, 'acme-http-01.txt'), "w").write("content-of-123") r = env.get_meta(domain, "/.well-known/acme-challenge/123", False) assert r.exit_code == 0 assert r.response['status'] == 200 assert r.response['header']['content-length'] == '14' def test_md_502_101(self, env): # test case: md with 2 domains domain = self.test_domain name = "www." + domain self._prepare_md(env, [name, "test." + domain]) assert env.apache_restart() == 0 # drive r = env.a2md(["-vv", "drive", "-c", "http-01", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) env.check_md_credentials([name, "test." + domain]) # test_502_102 removed, as accounts without ToS are not allowed in ACMEv2 def test_md_502_103(self, env): # test case: md with one domain, ACME account and TOS agreement on server # setup: create md domain = self.test_domain name = "www." + domain assert env.a2md(["add", name]).exit_code == 0 assert env.a2md(["update", name, "contacts", "admin@" + domain]).exit_code == 0 assert env.apache_restart() == 0 # setup: create account on server r = env.a2md(["-t", "accepted", "acme", "newreg", "admin@" + domain], raw=True) assert r.exit_code == 0 acct = re.match("registered: (.*)$", r.stdout).group(1) # setup: link md to account assert env.a2md(["update", name, "account", acct]).exit_code == 0 # drive r = env.a2md(["-vv", "drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) env.check_md_credentials([name]) # test_502_104 removed, order are created differently in ACMEv2 def test_md_502_105(self, env): # test case: md with one domain, local TOS agreement and ACME account that is deleted (!) on server # setup: create md domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) assert env.apache_restart() == 0 # setup: create account on server r = env.a2md(["-t", "accepted", "acme", "newreg", "test@" + domain], raw=True) assert r.exit_code == 0 acct = re.match("registered: (.*)$", r.stdout).group(1) # setup: link md to account assert env.a2md(["update", name, "account", acct]).exit_code == 0 # setup: delete account on server assert env.a2md(["acme", "delreg", acct]).exit_code == 0 # drive r = env.a2md(["drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) env.check_md_credentials([name]) def test_md_502_107(self, env): # test case: drive again on COMPLETE md, then drive --force # setup: prepare md in store domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) assert env.apache_restart() == 0 # drive r = env.a2md(["-vv", "drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) env.check_md_credentials([name]) orig_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) # drive again assert env.a2md(["-vv", "drive", name]).exit_code == 0 env.check_md_credentials([name]) cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) # check: cert not changed assert cert.same_serial_as(orig_cert) # drive --force assert env.a2md(["-vv", "drive", "--force", name]).exit_code == 0 env.check_md_credentials([name]) cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) # check: cert not changed assert not cert.same_serial_as(orig_cert) # check: previous cert was archived cert = MDCertUtil(env.store_archived_file(name, 2, 'pubcert.pem')) assert cert.same_serial_as(orig_cert) def test_md_502_108(self, env): # test case: drive via HTTP proxy domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) conf = MDConf(env, proxy=True) conf.add('LogLevel proxy:trace8') conf.install() assert env.apache_restart() == 0 # drive it, with wrong proxy url -> FAIL r = env.a2md(["-p", "http://localhost:1", "drive", name]) assert r.exit_code == 1 assert "Connection refused" in r.stderr # drive it, working proxy url -> SUCCESS proxy_url = f"http://localhost:{env.proxy_port}" r = env.a2md(["-vv", "-p", proxy_url, "drive", name]) assert 0 == r.exit_code, "a2md failed: {0}".format(r.stderr) env.check_md_credentials([name]) def test_md_502_109(self, env): # test case: redirect on SSL-only domain # setup: prepare config domain = self.test_domain name = "www." + domain conf = MDConf(env, admin="admin@" + domain) conf.add_drive_mode("manual") conf.add_md([name]) conf.add_vhost(name, port=env.http_port, doc_root="htdocs/test") conf.add_vhost(name, doc_root="htdocs/test") conf.install() # setup: create resource files self._write_res_file(os.path.join(env.server_docs_dir, "test"), "name.txt", name) self._write_res_file(os.path.join(env.server_docs_dir), "name.txt", "not-forbidden.org") assert env.apache_restart() == 0 # drive it assert env.a2md(["drive", name]).exit_code == 0 assert env.apache_restart() == 0 # test HTTP access - no redirect jdata = env.get_json_content(f"test1.{env.http_tld}", "/alive.json", use_https=False) assert jdata['host']== "test1" assert env.get_content(name, "/name.txt", use_https=False) == name r = env.get_meta(name, "/name.txt", use_https=False) assert int(r.response['header']['content-length']) == len(name) assert "Location" not in r.response['header'] # test HTTPS access assert env.get_content(name, "/name.txt", use_https=True) == name # test HTTP access again -> redirect to default HTTPS port conf.add("MDRequireHttps temporary") conf.install() assert env.apache_restart() == 0 r = env.get_meta(name, "/name.txt", use_https=False) assert r.response['status'] == 302 exp_location = "https://%s/name.txt" % name assert r.response['header']['location'] == exp_location # should not see this assert 'strict-transport-security' not in r.response['header'] # test default HTTP vhost -> still no redirect jdata = env.get_json_content(f"test1.{env.http_tld}", "/alive.json", use_https=False) assert jdata['host']== "test1" r = env.get_meta(name, "/name.txt", use_https=True) # also not for this assert 'strict-transport-security' not in r.response['header'] # test HTTP access again -> redirect permanent conf.add("MDRequireHttps permanent") conf.install() assert env.apache_restart() == 0 r = env.get_meta(name, "/name.txt", use_https=False) assert r.response['status'] == 301 exp_location = "https://%s/name.txt" % name assert r.response['header']['location'] == exp_location assert 'strict-transport-security' not in r.response['header'] # should see this r = env.get_meta(name, "/name.txt", use_https=True) assert r.response['header']['strict-transport-security'] == 'max-age=15768000' def test_md_502_110(self, env): # test case: SSL-only domain, override headers generated by mod_md # setup: prepare config domain = self.test_domain name = "www." + domain conf = MDConf(env, admin="admin@" + domain) conf.add_drive_mode("manual") conf.add("MDRequireHttps permanent") conf.add_md([name]) conf.add_vhost(name, port=env.http_port) conf.add_vhost(name) conf.install() assert env.apache_restart() == 0 # drive it assert env.a2md(["drive", name]).exit_code == 0 assert env.apache_restart() == 0 # test override HSTS header conf.add('Header set Strict-Transport-Security "max-age=10886400; includeSubDomains; preload"') conf.install() assert env.apache_restart() == 0 r = env.get_meta(name, "/name.txt", use_https=True) assert 'strict-transport-security' in r.response['header'], r.response['header'] assert r.response['header']['strict-transport-security'] == \ 'max-age=10886400; includeSubDomains; preload' # test override Location header conf.add(' Redirect /a /name.txt') conf.add(' Redirect seeother /b /name.txt') conf.install() assert env.apache_restart() == 0 # check: default redirect by mod_md still works exp_location = "https://%s/name.txt" % name r = env.get_meta(name, "/name.txt", use_https=False) assert r.response['status'] == 301 assert r.response['header']['location'] == exp_location # check: redirect as given by mod_alias exp_location = "https://%s/a" % name r = env.get_meta(name, "/a", use_https=False) assert r.response['status'] == 301 # FAIL: mod_alias generates Location header instead of mod_md assert r.response['header']['location'] == exp_location def test_md_502_111(self, env): # test case: vhost with parallel HTTP/HTTPS, check mod_alias redirects # setup: prepare config domain = self.test_domain name = "www." + domain conf = MDConf(env, admin="admin@" + domain) conf.add_drive_mode("manual") conf.add_md([name]) conf.add(" LogLevel alias:debug") conf.add_vhost(name, port=env.http_port) conf.add_vhost(name) conf.install() assert env.apache_restart() == 0 # drive it r = env.a2md(["-v", "drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) assert env.apache_restart() == 0 # setup: place redirect rules conf.add(' Redirect /a /name.txt') conf.add(' Redirect seeother /b /name.txt') conf.install() assert env.apache_restart() == 0 # check: redirects on HTTP exp_location = "http://%s:%s/name.txt" % (name, env.http_port) r = env.get_meta(name, "/a", use_https=False) assert r.response['status'] == 302 assert r.response['header']['location'] == exp_location r = env.get_meta(name, "/b", use_https=False) assert r.response['status'] == 303 assert r.response['header']['location'] == exp_location # check: redirects on HTTPS exp_location = "https://%s:%s/name.txt" % (name, env.https_port) r = env.get_meta(name, "/a", use_https=True) assert r.response['status'] == 302 assert r.response['header']['location'] == exp_location # FAIL: expected 'https://...' but found 'http://...' r = env.get_meta(name, "/b", use_https=True) assert r.response['status'] == 303 assert r.response['header']['location'] == exp_location def test_md_502_120(self, env): # test case: NP dereference reported by Daniel Caminada domain = self.test_domain name = "www." + domain conf = MDConf(env, admin="admin@" + domain) conf.add_drive_mode("manual") conf.add_md([name]) conf.add_vhost(name) conf.install() assert env.apache_restart() == 0 env.run(["openssl", "s_client", f"-connect", "localhost:{env.https_port}", "-servername", "example.com", "-crlf" ], intext="GET https:// HTTP/1.1\nHost: example.com\n\n") assert env.apache_restart() == 0 # --------- critical state change -> drive again --------- def test_md_502_200(self, env): # test case: add dns name on existing valid md # setup: create md in store domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) assert env.apache_restart() == 0 # setup: drive it r = env.a2md(["drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) old_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) # setup: add second domain assert env.a2md(["update", name, "domains", name, "test." + domain]).exit_code == 0 # drive r = env.a2md(["-vv", "drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) # check new cert env.check_md_credentials([name, "test." + domain]) new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) assert not old_cert.same_serial_as(new_cert.get_serial) @pytest.mark.parametrize("renew_window,test_data_list", [ ("14d", [ {"valid": {"notBefore": -5, "notAfter": 180}, "renew": False}, {"valid": {"notBefore": -200, "notAfter": 15}, "renew": False}, {"valid": {"notBefore": -200, "notAfter": 13}, "renew": True}, ]), ("30%", [ {"valid": {"notBefore": -0, "notAfter": 180}, "renew": False}, {"valid": {"notBefore": -120, "notAfter": 60}, "renew": False}, {"valid": {"notBefore": -126, "notAfter": 53}, "renew": True}, ]) ]) def test_md_502_201(self, env, renew_window, test_data_list): # test case: trigger cert renew when entering renew window # setup: prepare COMPLETE md domain = self.test_domain name = "www." + domain conf = MDConf(env, admin="admin@" + domain) conf.add_drive_mode("manual") conf.add_renew_window(renew_window) conf.add_md([name]) conf.install() assert env.apache_restart() == 0 assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_INCOMPLETE # setup: drive it r = env.a2md(["drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) cert1 = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_COMPLETE # replace cert by self-signed one -> check md status print("TRACE: start testing renew window: %s" % renew_window) for tc in test_data_list: print("TRACE: create self-signed cert: %s" % tc["valid"]) creds = env.create_self_signed_cert(CertificateSpec(domains=[name]), valid_from=timedelta(days=tc["valid"]["notBefore"]), valid_to=timedelta(days=tc["valid"]["notAfter"])) assert creds.certificate.serial_number != cert1.get_serial_number() # copy it over, assess status again creds.save_cert_pem(env.store_domain_file(name, 'pubcert.pem')) md = env.a2md(["list", name]).json['output'][0] assert md["renew"] == tc["renew"], \ "Expected renew == {} indicator in {}, test case {}".format(tc["renew"], md, tc) @pytest.mark.parametrize("key_type,key_params,exp_key_length", [ ("RSA", [2048], 2048), ("RSA", [3072], 3072), ("RSA", [4096], 4096), ("Default", [], 2048) ]) def test_md_502_202(self, env, key_type, key_params, exp_key_length): # test case: specify RSA key length and verify resulting cert key # setup: prepare md domain = self.test_domain name = "www." + domain conf = MDConf(env, admin="admin@" + domain) conf.add_drive_mode("manual") conf.add_private_key(key_type, key_params) conf.add_md([name]) conf.install() assert env.apache_restart() == 0 assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_INCOMPLETE # setup: drive it r = env.a2md(["-vv", "drive", name]) assert r.exit_code == 0, "drive for MDPrivateKeys {} {}: {}".format(key_type, key_params, r.stderr) assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_COMPLETE # check cert key length cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) assert cert.get_key_length() == exp_key_length # test_502_203 removed, as ToS agreement is not really checked in ACMEv2 # --------- non-critical state change -> keep data --------- def test_md_502_300(self, env): # test case: remove one domain name from existing valid md # setup: create md in store domain = self.test_domain name = "www." + domain self._prepare_md(env, [name, "test." + domain, "xxx." + domain]) assert env.apache_restart() == 0 # setup: drive it r = env.a2md(["drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) old_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) # setup: remove one domain assert env.a2md(["update", name, "domains"] + [name, "test." + domain]).exit_code == 0 # drive assert env.a2md(["-vv", "drive", name]).exit_code == 0 # compare cert serial new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) assert old_cert.same_serial_as(new_cert) def test_md_502_301(self, env): # test case: change contact info on existing valid md # setup: create md in store domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) assert env.apache_restart() == 0 # setup: drive it r = env.a2md(["drive", name]) assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr) old_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) # setup: add second domain assert env.a2md(["update", name, "contacts", "test@" + domain]).exit_code == 0 # drive assert env.a2md(["drive", name]).exit_code == 0 # compare cert serial new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) assert old_cert.same_serial_as(new_cert) # --------- network problems --------- def test_md_502_400(self, env): # test case: server not reachable domain = self.test_domain name = "www." + domain self._prepare_md(env, [name]) assert env.a2md( ["update", name, "ca", "http://localhost:4711/directory"] ).exit_code == 0 # drive r = env.a2md(["drive", name]) assert r.exit_code == 1 assert r.json['status'] != 0 assert r.json['description'] == 'Connection refused' # --------- _utils_ --------- def _prepare_md(self, env, domains): assert env.a2md(["add"] + domains).exit_code == 0 assert env.a2md( ["update", domains[0], "contacts", "admin@" + domains[0]] ).exit_code == 0 assert env.a2md( ["update", domains[0], "agreement", env.acme_tos] ).exit_code == 0 def _write_res_file(self, doc_root, name, content): if not os.path.exists(doc_root): os.makedirs(doc_root) open(os.path.join(doc_root, name), "w").write(content) RE_MSG_OPENSSL_BAD_DECRYPT = re.compile('.*\'bad decrypt\'.*') def _check_account_key(self, env, name): # read encryption key md_store = json.loads(open(env.path_store_json(), 'r').read()) encrypt_key = base64.urlsafe_b64decode(str(md_store['key'])) # check: key file is encrypted PEM md = env.a2md(["list", name]).json['output'][0] acc = md['ca']['account'] MDCertUtil.validate_privkey(env.path_account_key(acc), lambda *args: encrypt_key)