summaryrefslogtreecommitdiffstats
path: root/test/modules/http2/test_200_header_invalid.py
blob: cbc4b6c9fa8721b873039553b54d0413487abb55 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import re
import pytest

from .env import H2Conf, H2TestEnv


@pytest.mark.skipif(condition=H2TestEnv.is_unsupported, reason="mod_http2 not supported here")
class TestInvalidHeaders:

    @pytest.fixture(autouse=True, scope='class')
    def _class_scope(self, env):
        H2Conf(env).add_vhost_cgi().install()
        assert env.apache_restart() == 0

    # let the hecho.py CGI echo chars < 0x20 in field name
    # for almost all such characters, the stream returns a 500
    # or in httpd >= 2.5.0 gets aborted with a h2 error
    # cr is handled special
    def test_h2_200_01(self, env):
        url = env.mkurl("https", "cgi", "/hecho.py")
        for x in range(1, 32):
            data = f'name=x%{x:02x}x&value=yz'
            r = env.curl_post_data(url, data)
            if x in [13]:
                assert 0 == r.exit_code, f'unexpected exit code for char 0x{x:02}'
                assert 200 == r.response["status"], f'unexpected status for char 0x{x:02}'
            elif x in [10] or env.httpd_is_at_least('2.5.0'):
                assert 0 == r.exit_code, f'unexpected exit code for char 0x{x:02}'
                assert 500 == r.response["status"], f'unexpected status for char 0x{x:02}'
            else:
                assert 0 != r.exit_code, f'unexpected exit code for char 0x{x:02}'
        #
        env.httpd_error_log.ignore_recent(
            lognos = [
                "AH02429"   # Response header name contains invalid characters
            ],
            matches = [
                r'.*malformed header from script \'hecho.py\': Bad header: x.*'
            ]
        )

    # let the hecho.py CGI echo chars < 0x20 in field value
    # for almost all such characters, the stream returns a 500
    # or in httpd >= 2.5.0 gets aborted with a h2 error
    # cr and lf are handled special
    def test_h2_200_02(self, env):
        url = env.mkurl("https", "cgi", "/hecho.py")
        for x in range(1, 32):
            if 9 != x:
                r = env.curl_post_data(url, "name=x&value=y%%%02x" % x)
                if x in [10, 13]:
                    assert 0 == r.exit_code, "unexpected exit code for char 0x%02x" % x
                    assert 200 == r.response["status"], "unexpected status for char 0x%02x" % x
                elif env.httpd_is_at_least('2.5.0'):
                    assert 0 == r.exit_code, f'unexpected exit code for char 0x{x:02}'
                    assert 500 == r.response["status"], f'unexpected status for char 0x{x:02}'
                else:
                    assert 0 != r.exit_code, "unexpected exit code for char 0x%02x" % x
        #
        env.httpd_error_log.ignore_recent(
            lognos = [
                "AH02430"   # Response header value contains invalid characters
            ]
        )

    # let the hecho.py CGI echo 0x10 and 0x7f in field name and value
    def test_h2_200_03(self, env):
        url = env.mkurl("https", "cgi", "/hecho.py")
        for h in ["10", "7f"]:
            r = env.curl_post_data(url, "name=x%%%s&value=yz" % h)
            if env.httpd_is_at_least('2.5.0'):
                assert 0 == r.exit_code, f"unexpected exit code for char 0x{h:02}"
                assert 500 == r.response["status"], f"unexpected exit code for char 0x{h:02}"
            else:
                assert 0 != r.exit_code
            r = env.curl_post_data(url, "name=x&value=y%%%sz" % h)
            if env.httpd_is_at_least('2.5.0'):
                assert 0 == r.exit_code, f"unexpected exit code for char 0x{h:02}"
                assert 500 == r.response["status"], f"unexpected exit code for char 0x{h:02}"
            else:
                assert 0 != r.exit_code
        #
        env.httpd_error_log.ignore_recent(
            lognos = [
                "AH02429",  # Response header name contains invalid characters
                "AH02430"   # Response header value contains invalid characters
            ]
        )

    # test header field lengths check, LimitRequestLine
    def test_h2_200_10(self, env):
        conf = H2Conf(env)
        conf.add("""
            LimitRequestLine 1024
            """)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        val = 200*"1234567890"
        url = env.mkurl("https", "cgi", f'/?{val[:1022]}')
        r = env.curl_get(url)
        assert r.response["status"] == 200
        url = env.mkurl("https", "cgi", f'/?{val[:1023]}')
        r = env.curl_get(url)
        # URI too long
        assert 414 == r.response["status"]

    # test header field lengths check, LimitRequestFieldSize (default 8190)
    def test_h2_200_11(self, env):
        conf = H2Conf(env)
        conf.add("""
            LimitRequestFieldSize 1024
            """)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        url = env.mkurl("https", "cgi", "/")
        val = 200*"1234567890"
        # two fields, concatenated with ', '
        # LimitRequestFieldSize, one more char -> 400 in HTTP/1.1
        r = env.curl_get(url, options=[
            '-H', f'x: {val[:500]}', '-H', f'x: {val[:519]}'
        ])
        assert r.exit_code == 0, f'{r}'
        assert r.response["status"] == 200, f'{r}'
        r = env.curl_get(url, options=[
            '--http1.1', '-H', f'x: {val[:500]}', '-H', f'x: {val[:523]}'
        ])
        assert 400 == r.response["status"]
        r = env.curl_get(url, options=[
            '-H', f'x: {val[:500]}', '-H', f'x: {val[:520]}'
        ])
        assert 431 == r.response["status"]

    # test header field count, LimitRequestFields (default 100)
    # see #201: several headers with same name are mered and count only once
    def test_h2_200_12(self, env):
        url = env.mkurl("https", "cgi", "/")
        opt = []
        # curl sends 3 headers itself (user-agent, accept, and our AP-Test-Name)
        for i in range(97):
            opt += ["-H", "x: 1"]
        r = env.curl_get(url, options=opt)
        assert r.response["status"] == 200
        r = env.curl_get(url, options=(opt + ["-H", "y: 2"]))
        assert r.response["status"] == 200

    # test header field count, LimitRequestFields (default 100)
    # different header names count each
    def test_h2_200_13(self, env):
        url = env.mkurl("https", "cgi", "/")
        opt = []
        # curl sends 3 headers itself (user-agent, accept, and our AP-Test-Name)
        for i in range(97):
            opt += ["-H", f"x{i}: 1"]
        r = env.curl_get(url, options=opt)
        assert r.response["status"] == 200
        r = env.curl_get(url, options=(opt + ["-H", "y: 2"]))
        assert 431 == r.response["status"]

    # test "LimitRequestFields 0" setting, see #200
    def test_h2_200_14(self, env):
        conf = H2Conf(env)
        conf.add("""
            LimitRequestFields 20
            """)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        url = env.mkurl("https", "cgi", "/")
        opt = []
        for i in range(21):
            opt += ["-H", "x{0}: 1".format(i)]
        r = env.curl_get(url, options=opt)
        assert 431 == r.response["status"]
        conf = H2Conf(env)
        conf.add("""
            LimitRequestFields 0
            """)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        url = env.mkurl("https", "cgi", "/")
        opt = []
        for i in range(100):
            opt += ["-H", "x{0}: 1".format(i)]
        r = env.curl_get(url, options=opt)
        assert r.response["status"] == 200

    # the uri limits
    def test_h2_200_15(self, env):
        conf = H2Conf(env)
        conf.add("""
            LimitRequestLine 48
            """)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        url = env.mkurl("https", "cgi", "/")
        r = env.curl_get(url)
        assert r.response["status"] == 200
        url = env.mkurl("https", "cgi", "/" + (48*"x"))
        r = env.curl_get(url)
        assert 414 == r.response["status"]
        # nghttp sends the :method: header first (so far)
        # trigger a too long request line on it
        # the stream will RST and we get no response
        url = env.mkurl("https", "cgi", "/")
        opt = ["-H:method: {0}".format(100*"x")]
        r = env.nghttp().get(url, options=opt)
        assert r.exit_code == 0, r
        assert not r.response

    # invalid chars in method
    def test_h2_200_16(self, env):
        if not env.h2load_is_at_least('1.45.0'):
            pytest.skip(f'nhttp2 version too old')
        conf = H2Conf(env)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        url = env.mkurl("https", "cgi", "/hello.py")
        opt = ["-H:method: GET /hello.py"]
        r = env.nghttp().get(url, options=opt)
        assert r.exit_code == 0, r
        assert r.response is None
        url = env.mkurl("https", "cgi", "/proxy/hello.py")
        r = env.nghttp().get(url, options=opt)
        assert r.exit_code == 0, r
        assert r.response is None

    # test few failed headers, should
    def test_h2_200_17(self, env):
        url = env.mkurl("https", "cgi", "/")

    # test few failed headers, should give response
    def test_h2_200_17(self, env):
        conf = H2Conf(env)
        conf.add("""
            LimitRequestFieldSize 20
            LogLevel http2:debug
            """)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        re_emitted = re.compile(r'.* AH03401: .* shutdown, remote.emitted=1')
        url = env.mkurl("https", "cgi", "/")
        opt = []
        for i in range(10):
            opt += ["-H", f"x{i}: 012345678901234567890123456789"]
        r = env.curl_get(url, options=opt)
        assert r.response
        assert r.response["status"] == 431
        assert env.httpd_error_log.scan_recent(re_emitted)

    # test too many failed headers, should give RST
    def test_h2_200_18(self, env):
        conf = H2Conf(env)
        conf.add("""
            LimitRequestFieldSize 20
            LogLevel http2:debug
            """)
        conf.add_vhost_cgi()
        conf.install()
        assert env.apache_restart() == 0
        re_emitted = re.compile(r'.* AH03401: .* shutdown, remote.emitted=1')
        url = env.mkurl("https", "cgi", "/")
        opt = []
        for i in range(100):
            opt += ["-H", f"x{i}: 012345678901234567890123456789"]
        r = env.curl_get(url, options=opt)
        assert r.response is None
        assert env.httpd_error_log.scan_recent(re_emitted)

    # test header 10 invalid headers, should trigger stream RST
    def test_h2_200_19(self, env):
        url = env.mkurl("https", "cgi", "/")
        opt = []
        invalid = '\x7f'
        for i in range(10):
            opt += ["-H", f"x{i}: {invalid}"]
        r = env.curl_get(url, options=opt)
        assert r.response is None