1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
|
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef h2_bucket_beam_h
#define h2_bucket_beam_h
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
/*******************************************************************************
* apr_bucket list without bells and whistles
******************************************************************************/
/**
* h2_blist can hold a list of buckets just like apr_bucket_brigade, but
* does not to any allocations or related features.
*/
typedef struct {
APR_RING_HEAD(h2_bucket_list, apr_bucket) list;
} h2_blist;
#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link);
#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link)
#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list)
#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list)
#define H2_BLIST_INSERT_HEAD(b, e) do { \
apr_bucket *ap__b = (e); \
APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \
} while (0)
#define H2_BLIST_INSERT_TAIL(b, e) do { \
apr_bucket *ap__b = (e); \
APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \
} while (0)
#define H2_BLIST_CONCAT(a, b) do { \
APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
#define H2_BLIST_PREPEND(a, b) do { \
APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
/*******************************************************************************
* h2_bucket_beam
******************************************************************************/
/**
* A h2_bucket_beam solves the task of transferring buckets, esp. their data,
* across threads with zero buffer copies.
*
* When a thread, let's call it the sender thread, wants to send buckets to
* another, the green thread, it creates a h2_bucket_beam and adds buckets
* via the h2_beam_send(). It gives the beam to the green thread which then
* can receive buckets into its own brigade via h2_beam_receive().
*
* Sending and receiving can happen concurrently.
*
* The beam can limit the amount of data it accepts via the buffer_size. This
* can also be adjusted during its lifetime. Sends and receives can be done blocking.
* A timeout can be set for such blocks.
*
* Care needs to be taken when terminating the beam. The beam registers at
* the pool it was created with and will cleanup after itself. However, if
* received buckets do still exist, already freed memory might be accessed.
* The beam does a assertion on this condition.
*
* The proper way of shutting down a beam is to first make sure there are no
* more green buckets out there, then cleanup the beam to purge eventually
* still existing sender buckets and then, possibly, terminate the beam itself
* (or the pool it was created with).
*
* The following restrictions apply to bucket transport:
* - only EOS and FLUSH meta buckets are copied through. All other meta buckets
* are kept in the beams hold.
* - all kind of data buckets are transported through:
* - transient buckets are converted to heap ones on send
* - heap and pool buckets require no extra handling
* - buckets with indeterminate length are read on send
* - file buckets will transfer the file itself into a new bucket, if allowed
* - all other buckets are read on send to make sure data is present
*
* This assures that when the sender thread sends its sender buckets, the data
* is made accessible while still on the sender side. The sender bucket then enters
* the beams hold storage.
* When the green thread calls receive, sender buckets in the hold are wrapped
* into special beam buckets. Beam buckets on read present the data directly
* from the internal sender one, but otherwise live on the green side. When a
* beam bucket gets destroyed, it notifies its beam that the corresponding
* sender bucket from the hold may be destroyed.
* Since the destruction of green buckets happens in the green thread, any
* corresponding sender bucket can not immediately be destroyed, as that would
* result in race conditions.
* Instead, the beam transfers such sender buckets from the hold to the purge
* storage. Next time there is a call from the sender side, the buckets in
* purge will be deleted.
*
* There are callbacks that can be registesender with a beam:
* - a "consumed" callback that gets called on the sender side with the
* amount of data that has been received by the green side. The amount
* is a delta from the last callback invocation. The sender side can trigger
* these callbacks by calling h2_beam_send() with a NULL brigade.
* - a "can_beam_file" callback that can prohibit the transfer of file handles
* through the beam. This will cause file buckets to be read on send and
* its data buffer will then be transports just like a heap bucket would.
* When no callback is registered, no restrictions apply and all files are
* passed through.
* File handles transfersender to the green side will stay there until the
* receiving brigade's pool is destroyed/cleared. If the pool lives very
* long or if many different files are beamed, the process might run out
* of available file handles.
*
* The name "beam" of course is inspired by good old transporter
* technology where humans are kept inside the transporter's memory
* buffers until the transmission is complete. Star gates use a similar trick.
*/
typedef void h2_beam_mutex_leave(struct apr_thread_mutex_t *lock);
typedef struct {
apr_thread_mutex_t *mutex;
h2_beam_mutex_leave *leave;
} h2_beam_lock;
typedef struct h2_bucket_beam h2_bucket_beam;
typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
apr_off_t bytes);
typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam);
typedef struct h2_beam_proxy h2_beam_proxy;
typedef struct {
APR_RING_HEAD(h2_beam_proxy_list, h2_beam_proxy) list;
} h2_bproxy_list;
typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
apr_file_t *file);
typedef enum {
H2_BEAM_OWNER_SEND,
H2_BEAM_OWNER_RECV
} h2_beam_owner_t;
/**
* Will deny all transfer of apr_file_t across the beam and force
* a data copy instead.
*/
int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file);
struct h2_bucket_beam {
int id;
const char *tag;
apr_pool_t *pool;
h2_beam_owner_t owner;
h2_blist send_list;
h2_blist hold_list;
h2_blist purge_list;
apr_bucket_brigade *recv_buffer;
h2_bproxy_list proxies;
apr_pool_t *send_pool;
apr_pool_t *recv_pool;
apr_size_t max_buf_size;
apr_interval_time_t timeout;
apr_off_t sent_bytes; /* amount of bytes send */
apr_off_t received_bytes; /* amount of bytes received */
apr_size_t buckets_sent; /* # of beam buckets sent */
apr_size_t files_beamed; /* how many file handles have been set aside */
unsigned int aborted : 1;
unsigned int closed : 1;
unsigned int close_sent : 1;
unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */
struct apr_thread_mutex_t *lock;
struct apr_thread_cond_t *change;
apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */
h2_beam_ev_callback *cons_ev_cb;
h2_beam_io_callback *cons_io_cb;
void *cons_ctx;
apr_off_t prod_bytes_reported; /* amount of bytes reported as produced */
h2_beam_io_callback *prod_io_cb;
void *prod_ctx;
h2_beam_can_beam_callback *can_beam_fn;
void *can_beam_ctx;
};
/**
* Creates a new bucket beam for transfer of buckets across threads.
*
* The pool the beam is created with will be protected by the given
* mutex and will be used in multiple threads. It needs a pool allocator
* that is only used inside that same mutex.
*
* @param pbeam will hold the created beam on return
* @param pool pool owning the beam, beam will cleanup when pool released
* @param id identifier of the beam
* @param tag tag identifying beam for logging
* @param owner if the beam is owned by the sender or receiver, e.g. if
* the pool owner is using this beam for sending or receiving
* @param buffer_size maximum memory footprint of buckets buffered in beam, or
* 0 for no limitation
* @param timeout timeout for blocking operations
*/
apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
apr_pool_t *pool,
int id, const char *tag,
h2_beam_owner_t owner,
apr_size_t buffer_size,
apr_interval_time_t timeout);
/**
* Destroys the beam immediately without cleanup.
*/
apr_status_t h2_beam_destroy(h2_bucket_beam *beam);
/**
* Send buckets from the given brigade through the beam. Will hold buckets
* internally as long as they have not been processed by the receiving side.
* All accepted buckets are removed from the given brigade. Will return with
* APR_EAGAIN on non-blocking sends when not all buckets could be accepted.
*
* Call from the sender side only.
*/
apr_status_t h2_beam_send(h2_bucket_beam *beam,
apr_bucket_brigade *bb,
apr_read_type_e block);
/**
* Register the pool from which future buckets are send. This defines
* the lifetime of the buckets, e.g. the pool should not be cleared/destroyed
* until the data is no longer needed (or has been received).
*/
void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p);
/**
* Receive buckets from the beam into the given brigade. Will return APR_EOF
* when reading past an EOS bucket. Reads can be blocking until data is
* available or the beam has been closed. Non-blocking calls return APR_EAGAIN
* if no data is available.
*
* Call from the receiver side only.
*/
apr_status_t h2_beam_receive(h2_bucket_beam *beam,
apr_bucket_brigade *green_buckets,
apr_read_type_e block,
apr_off_t readbytes);
/**
* Determine if beam is empty.
*/
int h2_beam_empty(h2_bucket_beam *beam);
/**
* Determine if beam has handed out proxy buckets that are not destroyed.
*/
int h2_beam_holds_proxies(h2_bucket_beam *beam);
/**
* Abort the beam. Will cleanup any buffered buckets and answer all send
* and receives with APR_ECONNABORTED.
*
* Call from the sender side only.
*/
void h2_beam_abort(h2_bucket_beam *beam);
/**
* Close the beam. Sending an EOS bucket serves the same purpose.
*
* Call from the sender side only.
*/
apr_status_t h2_beam_close(h2_bucket_beam *beam);
/**
* Receives leaves the beam, e.g. will no longer read. This will
* interrupt any sender blocked writing and fail future send.
*
* Call from the receiver side only.
*/
apr_status_t h2_beam_leave(h2_bucket_beam *beam);
int h2_beam_is_closed(h2_bucket_beam *beam);
/**
* Return APR_SUCCESS when all buckets in transit have been handled.
* When called with APR_BLOCK_READ and a mutex set, will wait until the green
* side has consumed all data. Otherwise APR_EAGAIN is returned.
* With clear_buffers set, any queued data is discarded.
* If a timeout is set on the beam, waiting might also time out and
* return APR_ETIMEUP.
*
* Call from the sender side only.
*/
apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block);
/**
* Set/get the timeout for blocking read/write operations. Only works
* if a mutex has been set for the beam.
*/
void h2_beam_timeout_set(h2_bucket_beam *beam,
apr_interval_time_t timeout);
apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam);
/**
* Set/get the maximum buffer size for beam data (memory footprint).
*/
void h2_beam_buffer_size_set(h2_bucket_beam *beam,
apr_size_t buffer_size);
apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
/**
* Register a callback to be invoked on the sender side with the
* amount of bytes that have been consumed by the receiver, since the
* last callback invocation or reset.
* @param beam the beam to set the callback on
* @param ev_cb the callback or NULL, called when bytes are consumed
* @param io_cb the callback or NULL, called on sender with bytes consumed
* @param ctx the context to use in callback invocation
*
* Call from the sender side, io callbacks invoked on sender side, ev callback
* from any side.
*/
void h2_beam_on_consumed(h2_bucket_beam *beam,
h2_beam_ev_callback *ev_cb,
h2_beam_io_callback *io_cb, void *ctx);
/**
* Call any registered consumed handler, if any changes have happened
* since the last invocation.
* @return !=0 iff a handler has been called
*
* Needs to be invoked from the sending side.
*/
int h2_beam_report_consumption(h2_bucket_beam *beam);
/**
* Register a callback to be invoked on the receiver side with the
* amount of bytes that have been produces by the sender, since the
* last callback invocation or reset.
* @param beam the beam to set the callback on
* @param io_cb the callback or NULL, called on receiver with bytes produced
* @param ctx the context to use in callback invocation
*
* Call from the receiver side, callbacks invoked on either side.
*/
void h2_beam_on_produced(h2_bucket_beam *beam,
h2_beam_io_callback *io_cb, void *ctx);
/**
* Register a callback that may prevent a file from being beam as
* file handle, forcing the file content to be copied. Then no callback
* is set (NULL), file handles are transferred directly.
* @param beam the beam to set the callback on
* @param io_cb the callback or NULL, called on receiver with bytes produced
* @param ctx the context to use in callback invocation
*
* Call from the receiver side, callbacks invoked on either side.
*/
void h2_beam_on_file_beam(h2_bucket_beam *beam,
h2_beam_can_beam_callback *cb, void *ctx);
/**
* Get the amount of bytes currently buffered in the beam (unread).
*/
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);
/**
* Get the memory used by the buffered buckets, approximately.
*/
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);
/**
* Return != 0 iff (some) data from the beam has been received.
*/
int h2_beam_was_received(h2_bucket_beam *beam);
apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam);
typedef apr_bucket *h2_bucket_beamer(h2_bucket_beam *beam,
apr_bucket_brigade *dest,
const apr_bucket *src);
void h2_register_bucket_beamer(h2_bucket_beamer *beamer);
void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg);
#endif /* h2_bucket_beam_h */
|