summaryrefslogtreecommitdiffstats
path: root/ssl/quic/quic_stream_map.c
blob: 89b97c0a6b86b1d0593bbe9874749032d04ad330 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
/*
* Copyright 2022-2023 The OpenSSL Project Authors. All Rights Reserved.
*
* Licensed under the Apache License 2.0 (the "License").  You may not use
* this file except in compliance with the License.  You can obtain a copy
* in the file LICENSE in the source distribution or at
* https://www.openssl.org/source/license.html
*/

#include "internal/quic_stream_map.h"
#include "internal/nelem.h"

/*
 * QUIC Stream Map
 * ===============
 */
DEFINE_LHASH_OF_EX(QUIC_STREAM);

static void shutdown_flush_done(QUIC_STREAM_MAP *qsm, QUIC_STREAM *qs);

/* Circular list management. */
static void list_insert_tail(QUIC_STREAM_LIST_NODE *l,
                             QUIC_STREAM_LIST_NODE *n)
{
    /* Must not be in list. */
    assert(n->prev == NULL && n->next == NULL
           && l->prev != NULL && l->next != NULL);

    n->prev = l->prev;
    n->prev->next = n;
    l->prev = n;
    n->next = l;
}

static void list_remove(QUIC_STREAM_LIST_NODE *l,
                        QUIC_STREAM_LIST_NODE *n)
{
    assert(n->prev != NULL && n->next != NULL
           && n->prev != n && n->next != n);

    n->prev->next = n->next;
    n->next->prev = n->prev;
    n->next = n->prev = NULL;
}

static QUIC_STREAM *list_next(QUIC_STREAM_LIST_NODE *l, QUIC_STREAM_LIST_NODE *n,
                              size_t off)
{
    assert(n->prev != NULL && n->next != NULL
           && (n == l || (n->prev != n && n->next != n))
           && l->prev != NULL && l->next != NULL);

    n = n->next;

    if (n == l)
        n = n->next;
    if (n == l)
        return NULL;

    assert(n != NULL);

    return (QUIC_STREAM *)(((char *)n) - off);
}

#define active_next(l, s)       list_next((l), &(s)->active_node, \
                                          offsetof(QUIC_STREAM, active_node))
#define accept_next(l, s)       list_next((l), &(s)->accept_node, \
                                          offsetof(QUIC_STREAM, accept_node))
#define ready_for_gc_next(l, s) list_next((l), &(s)->ready_for_gc_node, \
                                          offsetof(QUIC_STREAM, ready_for_gc_node))
#define accept_head(l)          list_next((l), (l), \
                                          offsetof(QUIC_STREAM, accept_node))
#define ready_for_gc_head(l)    list_next((l), (l), \
                                          offsetof(QUIC_STREAM, ready_for_gc_node))

static unsigned long hash_stream(const QUIC_STREAM *s)
{
    return (unsigned long)s->id;
}

static int cmp_stream(const QUIC_STREAM *a, const QUIC_STREAM *b)
{
    if (a->id < b->id)
        return -1;
    if (a->id > b->id)
        return 1;
    return 0;
}

int ossl_quic_stream_map_init(QUIC_STREAM_MAP *qsm,
                              uint64_t (*get_stream_limit_cb)(int uni, void *arg),
                              void *get_stream_limit_cb_arg,
                              QUIC_RXFC *max_streams_bidi_rxfc,
                              QUIC_RXFC *max_streams_uni_rxfc,
                              int is_server)
{
    qsm->map = lh_QUIC_STREAM_new(hash_stream, cmp_stream);
    qsm->active_list.prev = qsm->active_list.next = &qsm->active_list;
    qsm->accept_list.prev = qsm->accept_list.next = &qsm->accept_list;
    qsm->ready_for_gc_list.prev = qsm->ready_for_gc_list.next
        = &qsm->ready_for_gc_list;
    qsm->rr_stepping = 1;
    qsm->rr_counter  = 0;
    qsm->rr_cur      = NULL;

    qsm->num_accept_bidi    = 0;
    qsm->num_accept_uni     = 0;
    qsm->num_shutdown_flush = 0;

    qsm->get_stream_limit_cb        = get_stream_limit_cb;
    qsm->get_stream_limit_cb_arg    = get_stream_limit_cb_arg;
    qsm->max_streams_bidi_rxfc      = max_streams_bidi_rxfc;
    qsm->max_streams_uni_rxfc       = max_streams_uni_rxfc;
    qsm->is_server                  = is_server;
    return 1;
}

static void release_each(QUIC_STREAM *stream, void *arg)
{
    QUIC_STREAM_MAP *qsm = arg;

    ossl_quic_stream_map_release(qsm, stream);
}

void ossl_quic_stream_map_cleanup(QUIC_STREAM_MAP *qsm)
{
    ossl_quic_stream_map_visit(qsm, release_each, qsm);

    lh_QUIC_STREAM_free(qsm->map);
    qsm->map = NULL;
}

void ossl_quic_stream_map_visit(QUIC_STREAM_MAP *qsm,
                                void (*visit_cb)(QUIC_STREAM *stream, void *arg),
                                void *visit_cb_arg)
{
    lh_QUIC_STREAM_doall_arg(qsm->map, visit_cb, visit_cb_arg);
}

QUIC_STREAM *ossl_quic_stream_map_alloc(QUIC_STREAM_MAP *qsm,
                                        uint64_t stream_id,
                                        int type)
{
    QUIC_STREAM *s;
    QUIC_STREAM key;

    key.id = stream_id;

    s = lh_QUIC_STREAM_retrieve(qsm->map, &key);
    if (s != NULL)
        return NULL;

    s = OPENSSL_zalloc(sizeof(*s));
    if (s == NULL)
        return NULL;

    s->id           = stream_id;
    s->type         = type;
    s->as_server    = qsm->is_server;
    s->send_state   = (ossl_quic_stream_is_local_init(s)
                       || ossl_quic_stream_is_bidi(s))
        ? QUIC_SSTREAM_STATE_READY
        : QUIC_SSTREAM_STATE_NONE;
    s->recv_state   = (!ossl_quic_stream_is_local_init(s)
                       || ossl_quic_stream_is_bidi(s))
        ? QUIC_RSTREAM_STATE_RECV
        : QUIC_RSTREAM_STATE_NONE;

    s->send_final_size  = UINT64_MAX;

    lh_QUIC_STREAM_insert(qsm->map, s);
    return s;
}

void ossl_quic_stream_map_release(QUIC_STREAM_MAP *qsm, QUIC_STREAM *stream)
{
    if (stream == NULL)
        return;

    if (stream->active_node.next != NULL)
        list_remove(&qsm->active_list, &stream->active_node);
    if (stream->accept_node.next != NULL)
        list_remove(&qsm->accept_list, &stream->accept_node);
    if (stream->ready_for_gc_node.next != NULL)
        list_remove(&qsm->ready_for_gc_list, &stream->ready_for_gc_node);

    ossl_quic_sstream_free(stream->sstream);
    stream->sstream = NULL;

    ossl_quic_rstream_free(stream->rstream);
    stream->rstream = NULL;

    lh_QUIC_STREAM_delete(qsm->map, stream);
    OPENSSL_free(stream);
}

QUIC_STREAM *ossl_quic_stream_map_get_by_id(QUIC_STREAM_MAP *qsm,
                                            uint64_t stream_id)
{
    QUIC_STREAM key;

    key.id = stream_id;

    return lh_QUIC_STREAM_retrieve(qsm->map, &key);
}

static void stream_map_mark_active(QUIC_STREAM_MAP *qsm, QUIC_STREAM *s)
{
    if (s->active)
        return;

    list_insert_tail(&qsm->active_list, &s->active_node);

    if (qsm->rr_cur == NULL)
        qsm->rr_cur = s;

    s->active = 1;
}

static void stream_map_mark_inactive(QUIC_STREAM_MAP *qsm, QUIC_STREAM *s)
{
    if (!s->active)
        return;

    if (qsm->rr_cur == s)
        qsm->rr_cur = active_next(&qsm->active_list, s);
    if (qsm->rr_cur == s)
        qsm->rr_cur = NULL;

    list_remove(&qsm->active_list, &s->active_node);

    s->active = 0;
}

void ossl_quic_stream_map_set_rr_stepping(QUIC_STREAM_MAP *qsm, size_t stepping)
{
    qsm->rr_stepping = stepping;
    qsm->rr_counter  = 0;
}

static int stream_has_data_to_send(QUIC_STREAM *s)
{
    OSSL_QUIC_FRAME_STREAM shdr;
    OSSL_QTX_IOVEC iov[2];
    size_t num_iov;
    uint64_t fc_credit, fc_swm, fc_limit;

    switch (s->send_state) {
    case QUIC_SSTREAM_STATE_READY:
    case QUIC_SSTREAM_STATE_SEND:
    case QUIC_SSTREAM_STATE_DATA_SENT:
        /*
         * We can still have data to send in DATA_SENT due to retransmissions,
         * etc.
         */
        break;
    default:
        return 0; /* Nothing to send. */
    }

    /*
     * We cannot determine if we have data to send simply by checking if
     * ossl_quic_txfc_get_credit() is zero, because we may also have older
     * stream data we need to retransmit. The SSTREAM returns older data first,
     * so we do a simple comparison of the next chunk the SSTREAM wants to send
     * against the TXFC CWM.
     */
    num_iov = OSSL_NELEM(iov);
    if (!ossl_quic_sstream_get_stream_frame(s->sstream, 0, &shdr, iov,
                                            &num_iov))
        return 0;

    fc_credit = ossl_quic_txfc_get_credit(&s->txfc, 0);
    fc_swm    = ossl_quic_txfc_get_swm(&s->txfc);
    fc_limit  = fc_swm + fc_credit;

    return (shdr.is_fin && shdr.len == 0) || shdr.offset < fc_limit;
}

static ossl_unused int qsm_send_part_permits_gc(const QUIC_STREAM *qs)
{
    switch (qs->send_state) {
    case QUIC_SSTREAM_STATE_NONE:
    case QUIC_SSTREAM_STATE_DATA_RECVD:
    case QUIC_SSTREAM_STATE_RESET_RECVD:
        return 1;
    default:
        return 0;
    }
}

static int qsm_ready_for_gc(QUIC_STREAM_MAP *qsm, QUIC_STREAM *qs)
{
    int recv_stream_fully_drained = 0; /* TODO(QUIC FUTURE): Optimisation */

    /*
     * If sstream has no FIN, we auto-reset it at marked-for-deletion time, so
     * we don't need to worry about that here.
     */
    assert(!qs->deleted
           || !ossl_quic_stream_has_send(qs)
           || ossl_quic_stream_send_is_reset(qs)
           || ossl_quic_stream_send_get_final_size(qs, NULL));

    return
        qs->deleted
        && (!ossl_quic_stream_has_recv(qs)
            || recv_stream_fully_drained
            || qs->acked_stop_sending)
        && (!ossl_quic_stream_has_send(qs)
            || qs->send_state == QUIC_SSTREAM_STATE_DATA_RECVD
            || qs->send_state == QUIC_SSTREAM_STATE_RESET_RECVD);
}

int ossl_quic_stream_map_is_local_allowed_by_stream_limit(QUIC_STREAM_MAP *qsm,
                                                          uint64_t stream_ordinal,
                                                          int is_uni)
{
    uint64_t stream_limit;

    if (qsm->get_stream_limit_cb == NULL)
        return 1;

    stream_limit = qsm->get_stream_limit_cb(is_uni, qsm->get_stream_limit_cb_arg);
    return stream_ordinal < stream_limit;
}

void ossl_quic_stream_map_update_state(QUIC_STREAM_MAP *qsm, QUIC_STREAM *s)
{
    int should_be_active, allowed_by_stream_limit = 1;

    if (ossl_quic_stream_is_server_init(s) == qsm->is_server) {
        int is_uni = !ossl_quic_stream_is_bidi(s);
        uint64_t stream_ordinal = s->id >> 2;

        allowed_by_stream_limit
            = ossl_quic_stream_map_is_local_allowed_by_stream_limit(qsm,
                                                                    stream_ordinal,
                                                                    is_uni);
    }

    if (s->send_state == QUIC_SSTREAM_STATE_DATA_SENT
        && ossl_quic_sstream_is_totally_acked(s->sstream))
        ossl_quic_stream_map_notify_totally_acked(qsm, s);
    else if (s->shutdown_flush
             && s->send_state == QUIC_SSTREAM_STATE_SEND
             && ossl_quic_sstream_is_totally_acked(s->sstream))
        shutdown_flush_done(qsm, s);

    if (!s->ready_for_gc) {
        s->ready_for_gc = qsm_ready_for_gc(qsm, s);
        if (s->ready_for_gc)
            list_insert_tail(&qsm->ready_for_gc_list, &s->ready_for_gc_node);
    }

    should_be_active
        = allowed_by_stream_limit
        && !s->ready_for_gc
        && ((ossl_quic_stream_has_recv(s)
             && !ossl_quic_stream_recv_is_reset(s)
             && (s->recv_state == QUIC_RSTREAM_STATE_RECV
                 && (s->want_max_stream_data
                     || ossl_quic_rxfc_has_cwm_changed(&s->rxfc, 0))))
            || s->want_stop_sending
            || s->want_reset_stream
            || (!s->peer_stop_sending && stream_has_data_to_send(s)));

    if (should_be_active)
        stream_map_mark_active(qsm, s);
    else
        stream_map_mark_inactive(qsm, s);
}

/*
 * Stream Send Part State Management
 * =================================
 */

int ossl_quic_stream_map_ensure_send_part_id(QUIC_STREAM_MAP *qsm,
                                             QUIC_STREAM *qs)
{
    switch (qs->send_state) {
    case QUIC_SSTREAM_STATE_NONE:
        /* Stream without send part - caller error. */
        return 0;

    case QUIC_SSTREAM_STATE_READY:
        /*
         * We always allocate a stream ID upfront, so we don't need to do it
         * here.
         */
        qs->send_state = QUIC_SSTREAM_STATE_SEND;
        return 1;

    default:
        /* Nothing to do. */
        return 1;
    }
}

int ossl_quic_stream_map_notify_all_data_sent(QUIC_STREAM_MAP *qsm,
                                              QUIC_STREAM *qs)
{
    switch (qs->send_state) {
    default:
        /* Wrong state - caller error. */
    case QUIC_SSTREAM_STATE_NONE:
        /* Stream without send part - caller error. */
        return 0;

    case QUIC_SSTREAM_STATE_SEND:
        if (!ossl_quic_sstream_get_final_size(qs->sstream, &qs->send_final_size))
            return 0;

        qs->send_state = QUIC_SSTREAM_STATE_DATA_SENT;
        return 1;
    }
}

static void shutdown_flush_done(QUIC_STREAM_MAP *qsm, QUIC_STREAM *qs)
{
    if (!qs->shutdown_flush)
        return;

    assert(qsm->num_shutdown_flush > 0);
    qs->shutdown_flush = 0;
    --qsm->num_shutdown_flush;
}

int ossl_quic_stream_map_notify_totally_acked(QUIC_STREAM_MAP *qsm,
                                              QUIC_STREAM *qs)
{
    switch (qs->send_state) {
    default:
        /* Wrong state - caller error. */
    case QUIC_SSTREAM_STATE_NONE:
        /* Stream without send part - caller error. */
        return 0;

    case QUIC_SSTREAM_STATE_DATA_SENT:
        qs->send_state = QUIC_SSTREAM_STATE_DATA_RECVD;
        /* We no longer need a QUIC_SSTREAM in this state. */
        ossl_quic_sstream_free(qs->sstream);
        qs->sstream = NULL;

        shutdown_flush_done(qsm, qs);
        return 1;
    }
}

int ossl_quic_stream_map_reset_stream_send_part(QUIC_STREAM_MAP *qsm,
                                                QUIC_STREAM *qs,
                                                uint64_t aec)
{
    switch (qs->send_state) {
    default:
    case QUIC_SSTREAM_STATE_NONE:
        /*
         * RESET_STREAM pertains to sending part only, so we cannot reset a
         * receive-only stream.
         */
    case QUIC_SSTREAM_STATE_DATA_RECVD:
        /*
         * RFC 9000 s. 3.3: A sender MUST NOT [...] send RESET_STREAM from a
         * terminal state. If the stream has already finished normally and the
         * peer has acknowledged this, we cannot reset it.
         */
        return 0;

    case QUIC_SSTREAM_STATE_READY:
        if (!ossl_quic_stream_map_ensure_send_part_id(qsm, qs))
            return 0;

        /* FALLTHROUGH */
    case QUIC_SSTREAM_STATE_SEND:
        /*
         * If we already have a final size (e.g. because we are coming from
         * DATA_SENT), we have to be consistent with that, so don't change it.
         * If we don't already have a final size, determine a final size value.
         * This is the value which we will end up using for a RESET_STREAM frame
         * for flow control purposes. We could send the stream size (total
         * number of bytes appended to QUIC_SSTREAM by the application), but it
         * is in our interest to exclude any bytes we have not actually
         * transmitted yet, to avoid unnecessarily consuming flow control
         * credit. We can get this from the TXFC.
         */
        qs->send_final_size = ossl_quic_txfc_get_swm(&qs->txfc);

        /* FALLTHROUGH */
    case QUIC_SSTREAM_STATE_DATA_SENT:
        qs->reset_stream_aec    = aec;
        qs->want_reset_stream   = 1;
        qs->send_state          = QUIC_SSTREAM_STATE_RESET_SENT;

        ossl_quic_sstream_free(qs->sstream);
        qs->sstream = NULL;

        shutdown_flush_done(qsm, qs);
        ossl_quic_stream_map_update_state(qsm, qs);
        return 1;

    case QUIC_SSTREAM_STATE_RESET_SENT:
    case QUIC_SSTREAM_STATE_RESET_RECVD:
        /*
         * Idempotent - no-op. In any case, do not send RESET_STREAM again - as
         * mentioned, we must not send it from a terminal state.
         */
        return 1;
    }
}

int ossl_quic_stream_map_notify_reset_stream_acked(QUIC_STREAM_MAP *qsm,
                                                   QUIC_STREAM *qs)
{
    switch (qs->send_state) {
    default:
        /* Wrong state - caller error. */
    case QUIC_SSTREAM_STATE_NONE:
        /* Stream without send part - caller error. */
         return 0;

    case QUIC_SSTREAM_STATE_RESET_SENT:
        qs->send_state = QUIC_SSTREAM_STATE_RESET_RECVD;
        return 1;

    case QUIC_SSTREAM_STATE_RESET_RECVD:
        /* Already in the correct state. */
        return 1;
    }
}

/*
 * Stream Receive Part State Management
 * ====================================
 */

int ossl_quic_stream_map_notify_size_known_recv_part(QUIC_STREAM_MAP *qsm,
                                                     QUIC_STREAM *qs,
                                                     uint64_t final_size)
{
    switch (qs->recv_state) {
    default:
        /* Wrong state - caller error. */
    case QUIC_RSTREAM_STATE_NONE:
        /* Stream without receive part - caller error. */
        return 0;

    case QUIC_RSTREAM_STATE_RECV:
        qs->recv_state = QUIC_RSTREAM_STATE_SIZE_KNOWN;
        return 1;
    }
}

int ossl_quic_stream_map_notify_totally_received(QUIC_STREAM_MAP *qsm,
                                                 QUIC_STREAM *qs)
{
    switch (qs->recv_state) {
    default:
        /* Wrong state - caller error. */
    case QUIC_RSTREAM_STATE_NONE:
        /* Stream without receive part - caller error. */
        return 0;

    case QUIC_RSTREAM_STATE_SIZE_KNOWN:
        qs->recv_state          = QUIC_RSTREAM_STATE_DATA_RECVD;
        qs->want_stop_sending   = 0;
        return 1;
    }
}

int ossl_quic_stream_map_notify_totally_read(QUIC_STREAM_MAP *qsm,
                                             QUIC_STREAM *qs)
{
    switch (qs->recv_state) {
    default:
        /* Wrong state - caller error. */
    case QUIC_RSTREAM_STATE_NONE:
        /* Stream without receive part - caller error. */
        return 0;

    case QUIC_RSTREAM_STATE_DATA_RECVD:
        qs->recv_state = QUIC_RSTREAM_STATE_DATA_READ;

        /* QUIC_RSTREAM is no longer needed */
        ossl_quic_rstream_free(qs->rstream);
        qs->rstream = NULL;
        return 1;
    }
}

int ossl_quic_stream_map_notify_reset_recv_part(QUIC_STREAM_MAP *qsm,
                                                QUIC_STREAM *qs,
                                                uint64_t app_error_code,
                                                uint64_t final_size)
{
    uint64_t prev_final_size;

    switch (qs->recv_state) {
    default:
    case QUIC_RSTREAM_STATE_NONE:
        /* Stream without receive part - caller error. */
        return 0;

    case QUIC_RSTREAM_STATE_RECV:
    case QUIC_RSTREAM_STATE_SIZE_KNOWN:
    case QUIC_RSTREAM_STATE_DATA_RECVD:
        if (ossl_quic_stream_recv_get_final_size(qs, &prev_final_size)
            && prev_final_size != final_size)
            /* Cannot change previous final size. */
            return 0;

        qs->recv_state              = QUIC_RSTREAM_STATE_RESET_RECVD;
        qs->peer_reset_stream_aec   = app_error_code;

        /* RFC 9000 s. 3.3: No point sending STOP_SENDING if already reset. */
        qs->want_stop_sending       = 0;

        /* QUIC_RSTREAM is no longer needed */
        ossl_quic_rstream_free(qs->rstream);
        qs->rstream = NULL;

        ossl_quic_stream_map_update_state(qsm, qs);
        return 1;

    case QUIC_RSTREAM_STATE_DATA_READ:
        /*
         * If we already retired the FIN to the application this is moot
         * - just ignore.
         */
    case QUIC_RSTREAM_STATE_RESET_RECVD:
    case QUIC_RSTREAM_STATE_RESET_READ:
        /* Could be a reordered/retransmitted frame - just ignore. */
        return 1;
    }
}

int ossl_quic_stream_map_notify_app_read_reset_recv_part(QUIC_STREAM_MAP *qsm,
                                                         QUIC_STREAM *qs)
{
    switch (qs->recv_state) {
    default:
        /* Wrong state - caller error. */
    case QUIC_RSTREAM_STATE_NONE:
        /* Stream without receive part - caller error. */
        return 0;

    case QUIC_RSTREAM_STATE_RESET_RECVD:
        qs->recv_state = QUIC_RSTREAM_STATE_RESET_READ;
        return 1;
    }
}

int ossl_quic_stream_map_stop_sending_recv_part(QUIC_STREAM_MAP *qsm,
                                                QUIC_STREAM *qs,
                                                uint64_t aec)
{
    if (qs->stop_sending)
        return 0;

    switch (qs->recv_state) {
    default:
    case QUIC_RSTREAM_STATE_NONE:
        /* Send-only stream, so this makes no sense. */
    case QUIC_RSTREAM_STATE_DATA_RECVD:
    case QUIC_RSTREAM_STATE_DATA_READ:
        /*
         * Not really any point in STOP_SENDING if we already received all data.
         */
    case QUIC_RSTREAM_STATE_RESET_RECVD:
    case QUIC_RSTREAM_STATE_RESET_READ:
        /*
         * RFC 9000 s. 3.5: "STOP_SENDING SHOULD only be sent for a stream that
         * has not been reset by the peer."
         *
         * No point in STOP_SENDING if the peer already reset their send part.
         */
        return 0;

    case QUIC_RSTREAM_STATE_RECV:
    case QUIC_RSTREAM_STATE_SIZE_KNOWN:
        /*
         * RFC 9000 s. 3.5: "If the stream is in the Recv or Size Known state,
         * the transport SHOULD signal this by sending a STOP_SENDING frame to
         * prompt closure of the stream in the opposite direction."
         *
         * Note that it does make sense to send STOP_SENDING for a receive part
         * of a stream which has a known size (because we have received a FIN)
         * but which still has other (previous) stream data yet to be received.
         */
        break;
    }

    qs->stop_sending        = 1;
    qs->stop_sending_aec    = aec;
    return ossl_quic_stream_map_schedule_stop_sending(qsm, qs);
}

/* Called to mark STOP_SENDING for generation, or regeneration after loss. */
int ossl_quic_stream_map_schedule_stop_sending(QUIC_STREAM_MAP *qsm, QUIC_STREAM *qs)
{
    if (!qs->stop_sending)
        return 0;

    /*
     * Ignore the call as a no-op if already scheduled, or in a state
     * where it makes no sense to send STOP_SENDING.
     */
    if (qs->want_stop_sending)
        return 1;

    switch (qs->recv_state) {
    default:
        return 1; /* ignore */
    case QUIC_RSTREAM_STATE_RECV:
    case QUIC_RSTREAM_STATE_SIZE_KNOWN:
        /*
         * RFC 9000 s. 3.5: "An endpoint is expected to send another
         * STOP_SENDING frame if a packet containing a previous STOP_SENDING is
         * lost. However, once either all stream data or a RESET_STREAM frame
         * has been received for the stream -- that is, the stream is in any
         * state other than "Recv" or "Size Known" -- sending a STOP_SENDING
         * frame is unnecessary."
         */
        break;
    }

    qs->want_stop_sending = 1;
    ossl_quic_stream_map_update_state(qsm, qs);
    return 1;
}

QUIC_STREAM *ossl_quic_stream_map_peek_accept_queue(QUIC_STREAM_MAP *qsm)
{
    return accept_head(&qsm->accept_list);
}

void ossl_quic_stream_map_push_accept_queue(QUIC_STREAM_MAP *qsm,
                                            QUIC_STREAM *s)
{
    list_insert_tail(&qsm->accept_list, &s->accept_node);
    if (ossl_quic_stream_is_bidi(s))
        ++qsm->num_accept_bidi;
    else
        ++qsm->num_accept_uni;
}

static QUIC_RXFC *qsm_get_max_streams_rxfc(QUIC_STREAM_MAP *qsm, QUIC_STREAM *s)
{
    return ossl_quic_stream_is_bidi(s)
        ? qsm->max_streams_bidi_rxfc
        : qsm->max_streams_uni_rxfc;
}

void ossl_quic_stream_map_remove_from_accept_queue(QUIC_STREAM_MAP *qsm,
                                                   QUIC_STREAM *s,
                                                   OSSL_TIME rtt)
{
    QUIC_RXFC *max_streams_rxfc;

    list_remove(&qsm->accept_list, &s->accept_node);
    if (ossl_quic_stream_is_bidi(s))
        --qsm->num_accept_bidi;
    else
        --qsm->num_accept_uni;

    if ((max_streams_rxfc = qsm_get_max_streams_rxfc(qsm, s)) != NULL)
        ossl_quic_rxfc_on_retire(max_streams_rxfc, 1, rtt);
}

size_t ossl_quic_stream_map_get_accept_queue_len(QUIC_STREAM_MAP *qsm, int is_uni)
{
    return is_uni ? qsm->num_accept_uni : qsm->num_accept_bidi;
}

size_t ossl_quic_stream_map_get_total_accept_queue_len(QUIC_STREAM_MAP *qsm)
{
    return ossl_quic_stream_map_get_accept_queue_len(qsm, /*is_uni=*/0)
        + ossl_quic_stream_map_get_accept_queue_len(qsm, /*is_uni=*/1);
}

void ossl_quic_stream_map_gc(QUIC_STREAM_MAP *qsm)
{
    QUIC_STREAM *qs, *qs_head, *qsn = NULL;

    for (qs = qs_head = ready_for_gc_head(&qsm->ready_for_gc_list);
         qs != NULL && qs != qs_head;
         qs = qsn)
    {
         qsn = ready_for_gc_next(&qsm->ready_for_gc_list, qs);

         ossl_quic_stream_map_release(qsm, qs);
    }
}

static int eligible_for_shutdown_flush(QUIC_STREAM *qs)
{
    /*
     * We only care about servicing the send part of a stream (if any) during
     * shutdown flush. We make sure we flush a stream if it is either
     * non-terminated or was terminated normally such as via
     * SSL_stream_conclude. A stream which was terminated via a reset is not
     * flushed, and we will have thrown away the send buffer in that case
     * anyway.
     */
    switch (qs->send_state) {
    case QUIC_SSTREAM_STATE_SEND:
    case QUIC_SSTREAM_STATE_DATA_SENT:
        return !ossl_quic_sstream_is_totally_acked(qs->sstream);
    default:
        return 0;
    }
}

static void begin_shutdown_flush_each(QUIC_STREAM *qs, void *arg)
{
    QUIC_STREAM_MAP *qsm = arg;

    if (!eligible_for_shutdown_flush(qs) || qs->shutdown_flush)
        return;

    qs->shutdown_flush = 1;
    ++qsm->num_shutdown_flush;
}

void ossl_quic_stream_map_begin_shutdown_flush(QUIC_STREAM_MAP *qsm)
{
    qsm->num_shutdown_flush = 0;

    ossl_quic_stream_map_visit(qsm, begin_shutdown_flush_each, qsm);
}

int ossl_quic_stream_map_is_shutdown_flush_finished(QUIC_STREAM_MAP *qsm)
{
    return qsm->num_shutdown_flush == 0;
}

/*
 * QUIC Stream Iterator
 * ====================
 */
void ossl_quic_stream_iter_init(QUIC_STREAM_ITER *it, QUIC_STREAM_MAP *qsm,
                                int advance_rr)
{
    it->qsm    = qsm;
    it->stream = it->first_stream = qsm->rr_cur;
    if (advance_rr && it->stream != NULL
        && ++qsm->rr_counter >= qsm->rr_stepping) {
        qsm->rr_counter = 0;
        qsm->rr_cur     = active_next(&qsm->active_list, qsm->rr_cur);
    }
}

void ossl_quic_stream_iter_next(QUIC_STREAM_ITER *it)
{
    if (it->stream == NULL)
        return;

    it->stream = active_next(&it->qsm->active_list, it->stream);
    if (it->stream == it->first_stream)
        it->stream = NULL;
}