Apache HTTPD
h2_mplx.c
Go to the documentation of this file.
1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <assert.h>
18#include <stddef.h>
19#include <stdlib.h>
20
21#include <apr_atomic.h>
22#include <apr_thread_mutex.h>
23#include <apr_thread_cond.h>
24#include <apr_strings.h>
25#include <apr_time.h>
26
27#include <httpd.h>
28#include <http_core.h>
29#include <http_connection.h>
30#include <http_log.h>
31#include <http_protocol.h>
32
33#include <mpm_common.h>
34
35#include "mod_http2.h"
36
37#include "h2.h"
38#include "h2_private.h"
39#include "h2_bucket_beam.h"
40#include "h2_config.h"
41#include "h2_c1.h"
42#include "h2_conn_ctx.h"
43#include "h2_protocol.h"
44#include "h2_mplx.h"
45#include "h2_request.h"
46#include "h2_stream.h"
47#include "h2_session.h"
48#include "h2_c2.h"
49#include "h2_workers.h"
50#include "h2_util.h"
51
52
53/* utility for iterating over ihash stream sets */
60
61static conn_rec *c2_prod_next(void *baton, int *phas_more);
62static void c2_prod_done(void *baton, conn_rec *c2);
63static void workers_shutdown(void *baton, int graceful);
64
66static void m_be_annoyed(h2_mplx *m);
67
72 void *on_ctx);
73
75
76/* APR callback invoked if allocation fails. */
77static int abort_on_oom(int retcode)
78{
80 return retcode; /* unreachable, hopefully. */
81}
82
88
89#define H2_MPLX_ENTER(m) \
90 do { apr_status_t rv_lock; if ((rv_lock = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
91 return rv_lock;\
92 } } while(0)
93
94#define H2_MPLX_LEAVE(m) \
95 apr_thread_mutex_unlock(m->lock)
96
97#define H2_MPLX_ENTER_ALWAYS(m) \
98 apr_thread_mutex_lock(m->lock)
99
100#define H2_MPLX_ENTER_MAYBE(m, dolock) \
101 if (dolock) apr_thread_mutex_lock(m->lock)
102
103#define H2_MPLX_LEAVE_MAYBE(m, dolock) \
104 if (dolock) apr_thread_mutex_unlock(m->lock)
105
110
111static int stream_is_running(h2_stream *stream)
112{
114 return conn_ctx && apr_atomic_read32(&conn_ctx->started) != 0
115 && apr_atomic_read32(&conn_ctx->done) == 0;
116}
117
119{
120 int rv;
121
123 rv = stream_is_running(stream);
125 return rv;
126}
127
128static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)
129{
131
132 h2_ihash_remove(m->shold, stream->id);
133 APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
134}
135
136static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
137{
139
141 H2_STRM_MSG(stream, "cleanup, unsubscribing from beam events"));
142 if (c2_ctx) {
143 if (c2_ctx->beam_out) {
145 }
146 if (c2_ctx->beam_in) {
147 h2_beam_on_send(c2_ctx->beam_in, NULL, NULL);
149 h2_beam_on_eagain(c2_ctx->beam_in, NULL, NULL);
151 }
152 }
153
155 H2_STRM_MSG(stream, "cleanup, removing from registries"));
156 ap_assert(stream->state == H2_SS_CLEANUP);
157 h2_stream_cleanup(stream);
158 h2_ihash_remove(m->streams, stream->id);
159 h2_iq_remove(m->q, stream->id);
160
161 if (c2_ctx) {
162 if (!stream_is_running(stream)) {
164 H2_STRM_MSG(stream, "cleanup, c2 is done, move to spurge"));
165 /* processing has finished */
166 APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
167 }
168 else {
170 H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
171 /* c2 is still running */
172 h2_c2_abort(stream->c2, m->c1);
174 H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
175 h2_ihash_add(m->shold, stream);
176 }
177 }
178 else {
179 /* never started */
181 H2_STRM_MSG(stream, "cleanup, never started, move to spurge"));
182 APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
183 }
184}
185
187{
191 apr_status_t rv;
192
193 /* We create a pool with its own allocator to be used for
194 * processing a request. This is the only way to have the processing
195 * independent of its parent pool in the sense that it can work in
196 * another thread.
197 */
198
200 if (rv == APR_SUCCESS) {
202 rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator);
203 }
204 if (rv != APR_SUCCESS) {
205 /* maybe the log goes through, maybe not. */
207 APLOGNO(10004) "h2_mplx: create transit pool");
209 return NULL; /* should never be reached. */
210 }
211
214 apr_pool_tag(ptrans, "h2_c2_transit");
215
216 transit = apr_pcalloc(ptrans, sizeof(*transit));
217 transit->pool = ptrans;
218 transit->bucket_alloc = apr_bucket_alloc_create(ptrans);
219 return transit;
220}
221
226
228{
229 h2_c2_transit **ptransit = apr_array_pop(m->c2_transits);
230 if (ptransit) {
231 return *ptransit;
232 }
233 return c2_transit_create(m);
234}
235
237{
238 if (m->c2_transits->nelts >= APR_INT32_MAX ||
239 (apr_uint32_t)m->c2_transits->nelts >= m->max_spare_transits) {
241 }
242 else {
243 APR_ARRAY_PUSH(m->c2_transits, h2_c2_transit*) = transit;
244 }
245}
246
258h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
261{
265 apr_thread_mutex_t *mutex = NULL;
266 h2_mplx *m = NULL;
267
268 m = apr_pcalloc(parent, sizeof(h2_mplx));
269 m->stream0 = stream0;
270 m->c1 = stream0->c2;
271 m->s = s;
272 m->child_num = child_num;
273 m->id = id;
274
275 /* We create a pool with its own allocator to be used for
276 * processing secondary connections. This is the only way to have the
277 * processing independent of its parent pool in the sense that it
278 * can work in another thread. Also, the new allocator needs its own
279 * mutex to synchronize sub-pools.
280 */
282 if (status != APR_SUCCESS) {
283 allocator = NULL;
284 goto failure;
285 }
286
289 if (!m->pool) goto failure;
290
291 apr_pool_tag(m->pool, "h2_mplx");
293
295 m->pool);
296 if (APR_SUCCESS != status) goto failure;
298
300 m->pool);
301 if (APR_SUCCESS != status) goto failure;
302
303 m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
304 m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
305
306 m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
307 m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
308 m->spurge = apr_array_make(m->pool, 10, sizeof(h2_stream*));
309 m->q = h2_iq_create(m->pool, m->max_streams);
310
311 m->workers = workers;
312 m->processing_max = H2MIN(h2_workers_get_max_workers(workers), m->max_streams);
313 m->processing_limit = 6; /* the original h1 max parallel connections */
314 m->last_mood_change = apr_time_now();
315 m->mood_update_interval = apr_time_from_msec(100);
316
318 if (APR_SUCCESS != status) {
320 "nghttp2: could not create pollset");
321 goto failure;
322 }
323 m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
324 m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
325
326 m->streams_input_read = h2_iq_create(m->pool, 10);
327 m->streams_output_written = h2_iq_create(m->pool, 10);
329 m->pool);
330 if (APR_SUCCESS != status) goto failure;
331
333 if (conn_ctx->pfd.reqevents) {
334 apr_pollset_add(m->pollset, &conn_ctx->pfd);
335 }
336
337 m->max_spare_transits = 3;
338 m->c2_transits = apr_array_make(m->pool, (int)m->max_spare_transits,
339 sizeof(h2_c2_transit*));
340
341 m->producer = h2_workers_register(workers, m->pool,
342 apr_psprintf(m->pool, "h2-%u",
343 (unsigned int)m->id),
346 return m;
347
348failure:
349 if (m->pool) {
350 apr_pool_destroy(m->pool);
351 }
352 else if (allocator) {
354 }
355 return NULL;
356}
357
359{
360 int max_stream_id_started = 0;
361
363
364 max_stream_id_started = m->max_stream_id_started;
365 /* Clear schedule queue, disabling existing streams from starting */
366 h2_iq_clear(m->q);
367
369 return max_stream_id_started;
370}
371
372typedef struct {
374 void *ctx;
376
377static int m_stream_iter_wrap(void *ctx, void *stream)
378{
380 return x->cb(stream, x->ctx);
381}
382
384{
386
388
389 x.cb = cb;
390 x.ctx = ctx;
391 h2_ihash_iter(m->streams, m_stream_iter_wrap, &x);
392
394 return APR_SUCCESS;
395}
396
401
402static int m_stream_want_send_data(void *ctx, void *stream)
403{
405 ++x->stream_count;
406 if (h2_stream_wants_send_data(stream))
407 ++x->stream_want_send;
408 return 1;
409}
410
421
422static int m_report_stream_iter(void *ctx, void *val) {
423 h2_mplx *m = ctx;
424 h2_stream *stream = val;
427 H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
428 !!stream->c2, stream->scheduled, h2_stream_is_ready(stream),
429 (long)(stream->output? h2_beam_get_buffered(stream->output) : -1));
430 if (conn_ctx) {
431 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
432 H2_STRM_MSG(stream, "->03198: %s %s %s"
433 "[started=%u/done=%u]"),
434 conn_ctx->request->method, conn_ctx->request->authority,
435 conn_ctx->request->path,
436 apr_atomic_read32(&conn_ctx->started),
438 }
439 else {
440 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
441 H2_STRM_MSG(stream, "->03198: not started"));
442 }
443 return 1;
444}
445
446static int m_unexpected_stream_iter(void *ctx, void *val) {
447 h2_mplx *m = ctx;
448 h2_stream *stream = val;
449 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, /* NO APLOGNO */
450 H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"),
451 !!stream->c2, stream->scheduled, h2_stream_is_ready(stream));
452 return 1;
453}
454
455static int m_stream_cancel_iter(void *ctx, void *val) {
456 h2_mplx *m = ctx;
457 h2_stream *stream = val;
458
459 /* take over event monitoring */
461 /* Reset, should transit to CLOSED state */
463 /* All connection data has been sent, simulate cleanup */
465 m_stream_cleanup(m, stream);
466 return 0;
467}
468
469static void c1_purge_streams(h2_mplx *m);
470
472{
474 unsigned int i, wait_secs = 60;
475 int old_aborted;
476
478 H2_MPLX_MSG(m, "start release"));
479 /* How to shut down a h2 connection:
480 * 0. abort and tell the workers that no more work will come from us */
481 m->shutdown = m->aborted = 1;
482
484
485 /* While really terminating any c2 connections, treat the master
486 * connection as aborted. It's not as if we could send any more data
487 * at this point. */
488 old_aborted = m->c1->aborted;
489 m->c1->aborted = 1;
490
491 /* How to shut down a h2 connection:
492 * 1. cancel all streams still active */
494 H2_MPLX_MSG(m, "release, %u/%u/%d streams (total/hold/purge), %d streams"),
495 h2_ihash_count(m->streams),
496 h2_ihash_count(m->shold),
497 m->spurge->nelts, m->processing_count);
498 while (!h2_ihash_iter(m->streams, m_stream_cancel_iter, m)) {
499 /* until empty */
500 }
501
502 /* 2. no more streams should be scheduled or in the active set */
503 ap_assert(h2_ihash_empty(m->streams));
505
506 /* 3. while workers are busy on this connection, meaning they
507 * are processing streams from this connection, wait on them finishing
508 * in order to wake us and let us check again.
509 * Eventually, this has to succeed. */
510 if (!m->join_wait) {
511 apr_thread_cond_create(&m->join_wait, m->pool);
512 }
513
514 for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
516
518 /* This can happen if we have very long running requests
519 * that do not time out on IO. */
521 H2_MPLX_MSG(m, "waited %u sec for %u streams"),
522 i*wait_secs, h2_ihash_count(m->shold));
524 }
525 }
526
528 h2_workers_join(m->workers, m->producer);
530
531 /* 4. With all workers done, all streams should be in spurge */
532 ap_assert(m->processing_count == 0);
533 if (!h2_ihash_empty(m->shold)) {
535 H2_MPLX_MSG(m, "unexpected %u streams in hold"),
536 h2_ihash_count(m->shold));
538 }
539
541
542 m->c1->aborted = old_aborted;
544
546 H2_MPLX_MSG(m, "released"));
547}
548
550 unsigned int *pstream_count)
551{
553
555 H2_STRM_MSG(stream, "cleanup"));
556 m_stream_cleanup(m, stream);
557 *pstream_count = h2_ihash_count(m->streams);
559 return APR_SUCCESS;
560}
561
563{
564 h2_stream *s = NULL;
565
567 s = h2_ihash_get(m->streams, stream_id);
569
570 return s;
571}
572
573
575{
576 h2_stream *stream;
577 int i;
578
579 for (i = 0; i < m->spurge->nelts; ++i) {
580 stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*);
581 ap_assert(stream->state == H2_SS_CLEANUP);
582
583 if (stream->input) {
584 h2_beam_destroy(stream->input, m->c1);
585 stream->input = NULL;
586 }
587 if (stream->c2) {
588 conn_rec *c2 = stream->c2;
591
592 stream->c2 = NULL;
594 transit = c2_ctx->transit;
595 h2_c2_destroy(c2); /* c2_ctx is gone as well */
596 if (transit) {
598 }
599 }
600 h2_stream_destroy(stream);
601 }
602 apr_array_clear(m->spurge);
603}
604
606{
608 if (m->spurge->nelts) {
610 }
612}
613
617 void *on_ctx)
618{
619 apr_status_t rv;
620
622
623 if (m->aborted) {
624 rv = APR_ECONNABORTED;
625 goto cleanup;
626 }
627 /* Purge (destroy) streams outside of pollset processing.
628 * Streams that are registered in the pollset, will be removed
629 * when they are destroyed, but the pollset works on copies
630 * of these registrations. So, if we destroy streams while
631 * processing pollset events, we might access freed memory.
632 */
633 if (m->spurge->nelts) {
635 }
637
638cleanup:
640 return rv;
641}
642
644 h2_session *session)
645{
647
649
650 if (m->aborted) {
652 }
653 else {
654 h2_iq_sort(m->q, cmp, session);
656 H2_MPLX_MSG(m, "reprioritize streams"));
658 }
659
661 return status;
662}
663
665 h2_stream *stream,
667 h2_session *session)
668{
670
671 if (m->aborted) {
672 rv = APR_ECONNABORTED;
673 goto cleanup;
674 }
675 if (!stream->request) {
676 rv = APR_EINVAL;
677 goto cleanup;
678 }
679 if (APLOGctrace1(m->c1)) {
680 const h2_request *r = stream->request;
682 H2_STRM_MSG(stream, "process %s%s%s %s%s%s%s"),
683 r->protocol? r->protocol : "",
684 r->protocol? " " : "",
685 r->method, r->scheme? r->scheme : "",
686 r->scheme? "://" : "",
687 r->authority, r->path? r->path: "");
688 }
689
690 stream->scheduled = 1;
691 h2_ihash_add(m->streams, stream);
692 if (h2_stream_is_ready(stream)) {
693 /* already have a response */
695 H2_STRM_MSG(stream, "process, ready already"));
696 }
697 else {
698 /* last chance to set anything up before stream is processed
699 * by worker threads. */
700 rv = h2_stream_prepare_processing(stream);
701 if (APR_SUCCESS != rv) goto cleanup;
702 h2_iq_add(m->q, stream->id, cmp, session);
704 H2_STRM_MSG(stream, "process, added to q"));
705 }
706
707cleanup:
708 return rv;
709}
710
712 h2_iqueue *ready_to_process,
715 h2_session *session,
716 unsigned int *pstream_count)
717{
718 apr_status_t rv;
719 int sid;
720
722
723 while ((sid = h2_iq_shift(ready_to_process)) > 0) {
724 h2_stream *stream = get_stream(session, sid);
725 if (stream) {
726 ap_assert(!stream->scheduled);
727 rv = c1_process_stream(session->mplx, stream, stream_pri_cmp, session);
728 if (APR_SUCCESS != rv) {
730 }
731 }
732 else {
734 H2_MPLX_MSG(m, "stream %d not found to process"), sid);
735 }
736 }
737 if ((m->processing_count < m->processing_limit) && !h2_iq_empty(m->q)) {
739 rv = h2_workers_activate(m->workers, m->producer);
741 if (rv != APR_SUCCESS) {
742 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10021)
743 H2_MPLX_MSG(m, "activate at workers"));
744 }
745 }
746 *pstream_count = h2_ihash_count(m->streams);
747
748#if APR_POOL_DEBUG
749 do {
751
753 mem_m = apr_pool_num_bytes(m->pool, 1);
754 mem_s = apr_pool_num_bytes(session->pool, 1);
755 mem_c1 = apr_pool_num_bytes(m->c1->pool, 1);
757 H2_MPLX_MSG(m, "child mem=%ld, mplx mem=%ld, session mem=%ld, c1=%ld"),
758 (long)mem_g, (long)mem_m, (long)mem_s, (long)mem_c1);
759
760 } while (0);
761#endif
762
764}
765
767{
768 conn_rec *c = ctx;
770
771 (void)beam;
772 if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in[H2_PIPE_IN]) {
773 apr_file_putc(1, conn_ctx->pipe_in[H2_PIPE_IN]);
774 }
775}
776
777static void add_stream_poll_event(h2_mplx *m, int stream_id, h2_iqueue *q)
778{
779 apr_thread_mutex_lock(m->poll_lock);
780 if (h2_iq_append(q, stream_id) && h2_iq_count(q) == 1) {
781 /* newly added first */
782 apr_pollset_wakeup(m->pollset);
783 }
784 apr_thread_mutex_unlock(m->poll_lock);
785}
786
788{
789 conn_rec *c = ctx;
791
792 if (conn_ctx && conn_ctx->stream_id) {
793 add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id,
794 conn_ctx->mplx->streams_input_read);
795 }
796}
797
799{
800 conn_rec *c = ctx;
802 /* installed in the input bucket beams when we use pipes.
803 * Drain the pipe just before the beam returns APR_EAGAIN.
804 * A clean state for allowing polling on the pipe to rest
805 * when the beam is empty */
806 if (conn_ctx && conn_ctx->pipe_in[H2_PIPE_OUT]) {
808 }
809}
810
812{
813 conn_rec *c = ctx;
815
816 if (conn_ctx && conn_ctx->stream_id) {
817 add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id,
818 conn_ctx->mplx->streams_output_written);
819 }
820}
821
823{
826 const char *action = "init";
827
828 rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream, transit);
829 if (APR_SUCCESS != rv) goto cleanup;
830
831 if (!conn_ctx->beam_out) {
832 action = "create output beam";
833 rv = h2_beam_create(&conn_ctx->beam_out, c2, conn_ctx->req_pool,
834 stream->id, "output", 0, c2->base_server->timeout);
835 if (APR_SUCCESS != rv) goto cleanup;
836
837 h2_beam_buffer_size_set(conn_ctx->beam_out, m->stream_max_mem);
839 }
840
841 memset(&conn_ctx->pipe_in, 0, sizeof(conn_ctx->pipe_in));
842 if (stream->input) {
843 conn_ctx->beam_in = stream->input;
847#if H2_USE_PIPES
848 action = "create input write pipe";
850 &conn_ctx->pipe_in[H2_PIPE_IN],
852 c2->pool, c2->pool);
853 if (APR_SUCCESS != rv) goto cleanup;
854#endif
856 if (!h2_beam_empty(stream->input))
858 }
859
860cleanup:
861 stream->output = (APR_SUCCESS == rv)? conn_ctx->beam_out : NULL;
862 if (APR_SUCCESS != rv) {
864 H2_STRM_LOG(APLOGNO(10309), stream,
865 "error %s"), action);
866 }
867 return rv;
868}
869
871{
872 h2_stream *stream = NULL;
875 conn_rec *c2 = NULL;
877
878 while (!m->aborted && !stream && (m->processing_count < m->processing_limit)
879 && (sid = h2_iq_shift(m->q)) > 0) {
880 stream = h2_ihash_get(m->streams, sid);
881 }
882
883 if (!stream) {
884 if (m->processing_count >= m->processing_limit && !h2_iq_empty(m->q)) {
886 H2_MPLX_MSG(m, "delaying request processing. "
887 "Current limit is %d and %d workers are in use."),
888 m->processing_limit, m->processing_count);
889 }
890 goto cleanup;
891 }
892
893 if (sid > m->max_stream_id_started) {
894 m->max_stream_id_started = sid;
895 }
896
898#if AP_HAS_RESPONSE_BUCKETS
899 c2 = ap_create_secondary_connection(transit->pool, m->c1, transit->bucket_alloc);
900#else
901 c2 = h2_c2_create(m->c1, transit->pool, transit->bucket_alloc);
902#endif
903 if (!c2) goto cleanup;
905 H2_STRM_MSG(stream, "created new c2"));
906
907 rv = c2_setup_io(m, c2, stream, transit);
908 if (APR_SUCCESS != rv) goto cleanup;
909
910 stream->c2 = c2;
911 ++m->processing_count;
912
913cleanup:
914 if (APR_SUCCESS != rv && c2) {
915 h2_c2_destroy(c2);
916 c2 = NULL;
917 }
918 if (transit && !c2) {
920 }
921 return c2;
922}
923
925{
926 h2_mplx *m = baton;
927 conn_rec *c = NULL;
928
930 if (!m->aborted) {
931 c = s_next_c2(m);
932 *phas_more = (c != NULL && !h2_iq_empty(m->q));
933 }
935 return c;
936}
937
939{
940 h2_stream *stream;
941
944 "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id);
945
947 apr_atomic_set32(&conn_ctx->done, 1);
948 conn_ctx->done_at = apr_time_now();
949 ++c2->keepalives;
950 /* From here on, the final handling of c2 is done by c1 processing.
951 * Which means we can give it c1's scoreboard handle for updates. */
952 c2->sbh = m->c1->sbh;
953
955 "h2_mplx(%s-%d): request done, %f ms elapsed",
956 conn_ctx->id, conn_ctx->stream_id,
957 (conn_ctx->done_at - conn_ctx->started_at) / 1000.0);
958
959 if (!conn_ctx->has_final_response) {
961 "h2_c2(%s-%d): processing finished without final response",
962 conn_ctx->id, conn_ctx->stream_id);
963 c2->aborted = 1;
964 if (conn_ctx->beam_out)
965 h2_beam_abort(conn_ctx->beam_out, c2);
966 }
967 else if (!conn_ctx->beam_out || !h2_beam_is_complete(conn_ctx->beam_out)) {
969 "h2_c2(%s-%d): processing finished with incomplete output",
970 conn_ctx->id, conn_ctx->stream_id);
971 c2->aborted = 1;
972 h2_beam_abort(conn_ctx->beam_out, c2);
973 }
974 else if (!c2->aborted) {
976 }
977
978 stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
979 if (stream) {
980 /* stream not done yet. trigger a potential polling on the output
981 * since nothing more will happening here. */
983 H2_STRM_MSG(stream, "c2_done, stream open"));
985 }
986 else if ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) {
987 /* stream is done, was just waiting for this. */
989 H2_STRM_MSG(stream, "c2_done, in hold"));
990 c1c2_stream_joined(m, stream);
991 }
992 else {
993 int i;
994
995 for (i = 0; i < m->spurge->nelts; ++i) {
996 if (stream == APR_ARRAY_IDX(m->spurge, i, h2_stream*)) {
998 H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
999 ap_assert("stream should not be in spurge" == NULL);
1000 return;
1001 }
1002 }
1003
1005 "h2_mplx(%s-%d): c2_done, stream not found",
1006 conn_ctx->id, conn_ctx->stream_id);
1007 ap_assert("stream should still be available" == NULL);
1008 }
1009}
1010
1011static void c2_prod_done(void *baton, conn_rec *c2)
1012{
1013 h2_mplx *m = baton;
1015
1018
1019 --m->processing_count;
1020 s_c2_done(m, c2, conn_ctx);
1021 if (m->join_wait) apr_thread_cond_signal(m->join_wait);
1022
1024}
1025
1026static void workers_shutdown(void *baton, int graceful)
1027{
1028 h2_mplx *m = baton;
1029
1030 apr_thread_mutex_lock(m->poll_lock);
1031 /* time to wakeup and assess what to do */
1033 H2_MPLX_MSG(m, "workers shutdown, waking pollset"));
1034 m->shutdown = 1;
1035 if (!graceful) {
1036 m->aborted = 1;
1037 }
1038 apr_pollset_wakeup(m->pollset);
1039 apr_thread_mutex_unlock(m->poll_lock);
1040}
1041
1042/*******************************************************************************
1043 * h2_mplx DoS protection
1044 ******************************************************************************/
1045
1047{
1048 apr_time_t now;
1049
1050 if (m->processing_limit < m->processing_max
1051 && conn_ctx->started_at > m->last_mood_change) {
1052 --m->irritations_since;
1053 if (m->processing_limit < m->processing_max
1054 && ((now = apr_time_now()) - m->last_mood_change >= m->mood_update_interval
1055 || m->irritations_since < -m->processing_limit)) {
1056 m->processing_limit = H2MIN(m->processing_limit * 2, m->processing_max);
1057 m->last_mood_change = now;
1058 m->irritations_since = 0;
1060 H2_MPLX_MSG(m, "mood update, increasing worker limit to %d"),
1061 m->processing_limit);
1062 }
1063 }
1064}
1065
1067{
1069
1070 if (m->processing_limit > 2) {
1071 ++m->irritations_since;
1072 if (((now = apr_time_now()) - m->last_mood_change >= m->mood_update_interval)
1073 || (m->irritations_since >= m->processing_limit)) {
1074
1075 if (m->processing_limit > 16) {
1076 m->processing_limit = 16;
1077 }
1078 else if (m->processing_limit > 8) {
1079 m->processing_limit = 8;
1080 }
1081 else if (m->processing_limit > 4) {
1082 m->processing_limit = 4;
1083 }
1084 else if (m->processing_limit > 2) {
1085 m->processing_limit = 2;
1086 }
1087 m->last_mood_change = now;
1088 m->irritations_since = 0;
1090 H2_MPLX_MSG(m, "mood update, decreasing worker limit to %d"),
1091 m->processing_limit);
1092 }
1093 }
1094}
1095
1096/*******************************************************************************
1097 * mplx master events dispatching
1098 ******************************************************************************/
1099
1101{
1102 /* client may terminate a stream via H2 RST_STREAM message at any time.
1103 * This is annyoing when we have committed resources (e.g. worker threads)
1104 * to it, so our mood (e.g. willingness to commit resources on this
1105 * connection in the future) goes down.
1106 *
1107 * This is a DoS protection. We do not want to make it too easy for
1108 * a client to eat up server resources.
1109 *
1110 * However: there are cases where a RST_STREAM is the only way to end
1111 * a request. This includes websockets and server-side-event streams (SSEs).
1112 * The responses to such requests continue forever otherwise.
1113 *
1114 */
1115 if (!stream_is_running(stream)) return 1;
1116 if (!(stream->id & 0x01)) return 1; /* stream initiated by us. acceptable. */
1117 if (!stream->response) return 0; /* no response headers produced yet. bad. */
1118 if (!stream->out_data_frames) return 0; /* no response body data sent yet. bad. */
1119 return 1; /* otherwise, be forgiving */
1120}
1121
1123{
1125 int registered;
1126
1128 registered = (h2_ihash_get(m->streams, stream_id) != NULL);
1129 if (!stream) {
1130 /* a RST might arrive so late, we have already forgotten
1131 * about it. Seems ok. */
1133 H2_MPLX_MSG(m, "RST on unknown stream %d"), stream_id);
1135 }
1136 else if (!registered) {
1137 /* a RST on a stream that mplx has not been told about, but
1138 * which the session knows. Very early and annoying. */
1140 H2_STRM_MSG(stream, "very early RST, drop"));
1141 h2_stream_set_monitor(stream, NULL);
1144 m_stream_cleanup(m, stream);
1145 m_be_annoyed(m);
1146 }
1147 else if (!reset_is_acceptable(stream)) {
1148 m_be_annoyed(m);
1149 }
1151 return status;
1152}
1153
1155{
1156 /* stream0 output only */
1157 return apr_pollset_create(&m->pollset, 1, m->pool,
1159}
1160
1164 void *on_ctx)
1165{
1166 apr_status_t rv;
1167 const apr_pollfd_t *results, *pfd;
1170 h2_stream *stream;
1171
1172 /* Make sure we are not called recursively. */
1173 ap_assert(!m->polling);
1174 m->polling = 1;
1175 do {
1177 H2_MPLX_MSG(m, "enter polling timeout=%d"),
1178 (int)apr_time_sec(timeout));
1179
1180 apr_array_clear(m->streams_ev_in);
1181 apr_array_clear(m->streams_ev_out);
1182
1183 do {
1184 /* add streams we started processing in the meantime */
1185 apr_thread_mutex_lock(m->poll_lock);
1186 if (!h2_iq_empty(m->streams_input_read)
1187 || !h2_iq_empty(m->streams_output_written)) {
1188 while ((i = h2_iq_shift(m->streams_input_read))) {
1189 stream = h2_ihash_get(m->streams, i);
1190 if (stream) {
1191 APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
1192 }
1193 }
1194 while ((i = h2_iq_shift(m->streams_output_written))) {
1195 stream = h2_ihash_get(m->streams, i);
1196 if (stream) {
1197 APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
1198 }
1199 }
1200 nresults = 0;
1201 rv = APR_SUCCESS;
1202 apr_thread_mutex_unlock(m->poll_lock);
1203 break;
1204 }
1205 apr_thread_mutex_unlock(m->poll_lock);
1206
1208 rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
1210 if (APR_STATUS_IS_EINTR(rv) && m->shutdown) {
1211 if (!m->aborted) {
1212 rv = APR_SUCCESS;
1213 }
1214 goto cleanup;
1215 }
1216 } while (APR_STATUS_IS_EINTR(rv));
1217
1218 if (APR_SUCCESS != rv) {
1219 if (APR_STATUS_IS_TIMEUP(rv)) {
1221 H2_MPLX_MSG(m, "polling timed out "));
1222 }
1223 else {
1224 ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10310) \
1225 H2_MPLX_MSG(m, "polling failed"));
1226 }
1227 goto cleanup;
1228 }
1229
1230 for (i = 0; i < nresults; i++) {
1231 pfd = &results[i];
1232 conn_ctx = pfd->client_data;
1233
1235 if (conn_ctx->stream_id == 0) {
1236 if (on_stream_input) {
1237 APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
1238 }
1239 continue;
1240 }
1241 }
1242
1243 if (on_stream_input && m->streams_ev_in->nelts) {
1245 for (i = 0; i < m->streams_ev_in->nelts; ++i) {
1246 on_stream_input(on_ctx, APR_ARRAY_IDX(m->streams_ev_in, i, h2_stream*));
1247 }
1249 }
1250 if (on_stream_output && m->streams_ev_out->nelts) {
1252 for (i = 0; i < m->streams_ev_out->nelts; ++i) {
1253 on_stream_output(on_ctx, APR_ARRAY_IDX(m->streams_ev_out, i, h2_stream*));
1254 }
1256 }
1257 break;
1258 } while(1);
1259
1260cleanup:
1261 m->polling = 0;
1262 return rv;
1263}
1264
APR Atomic Operations.
APR Strings library.
APR Condition Variable Routines.
APR Thread Mutex Routines.
APR Time Library.
void apr_atomic_set32(volatile apr_uint32_t *mem, apr_uint32_t val)
Definition atomic.c:73
apr_uint32_t apr_atomic_read32(volatile apr_uint32_t *mem)
Definition atomic.c:68
request_rec * r
#define APLOGNO(n)
Definition http_log.h:117
#define APLOG_INFO
Definition http_log.h:70
#define APLOG_ERR
Definition http_log.h:67
#define APLOG_TRACE3
Definition http_log.h:74
#define ap_log_cerror
Definition http_log.h:498
#define APLOG_MARK
Definition http_log.h:283
#define APLOG_WARNING
Definition http_log.h:68
#define APLOG_TRACE2
Definition http_log.h:73
#define APLOG_TRACE1
Definition http_log.h:72
#define APLOGctrace1(c)
Definition http_log.h:257
#define APLOG_DEBUG
Definition http_log.h:71
ap_vhost_iterate_conn_cb void * baton
Definition http_vhost.h:87
apr_uint32_t ap_max_mem_free
Definition mpm_common.c:155
#define APR_ECONNABORTED
Definition apr_errno.h:769
#define APR_EINVAL
Definition apr_errno.h:711
#define APR_STATUS_IS_EINTR(s)
Definition apr_errno.h:1281
#define APR_STATUS_IS_TIMEUP(s)
Definition apr_errno.h:534
apr_brigade_flush void * ctx
int apr_off_t * length
#define AP_DEBUG_ASSERT(exp)
Definition httpd.h:2283
void ap_abort_on_oom(void) __attribute__((noreturn))
Definition util.c:3136
#define ap_assert(exp)
Definition httpd.h:2271
apr_size_t size
apr_uint32_t apr_uint32_t cmp
Definition apr_atomic.h:106
apr_uint32_t val
Definition apr_atomic.h:66
const char int apr_pool_t * pool
Definition apr_cstr.h:84
#define APR_SUCCESS
Definition apr_errno.h:225
int apr_status_t
Definition apr_errno.h:44
void const char apr_status_t(* cleanup)(void *))
apr_vformatter_buff_t * c
Definition apr_lib.h:175
const char apr_uint32_t * id
apr_abortfunc_t apr_allocator_t * allocator
Definition apr_pools.h:208
apr_pool_t * parent
Definition apr_pools.h:197
#define apr_pcalloc(p, size)
Definition apr_pools.h:465
const char * s
Definition apr_strings.h:95
const void * m
#define APR_ARRAY_PUSH(ary, type)
Definition apr_tables.h:150
#define APR_ARRAY_IDX(ary, i, type)
Definition apr_tables.h:141
int int status
#define APR_READ_BLOCK
#define apr_time_from_msec(msec)
Definition apr_time.h:75
apr_int64_t apr_interval_time_t
Definition apr_time.h:55
apr_int64_t apr_time_t
Definition apr_time.h:45
#define apr_time_sec(time)
Definition apr_time.h:63
#define apr_time_from_sec(sec)
Definition apr_time.h:78
#define APR_POLLSET_WAKEABLE
Definition apr_poll.h:68
@ H2_SS_CLEANUP
Definition h2.h:149
#define H2_ERR_NO_ERROR
Definition h2.h:58
struct h2_stream * h2_stream_get_fn(struct h2_session *session, int stream_id)
Definition h2.h:195
@ H2_SEV_EOS_SENT
Definition h2.h:157
#define H2_ERR_STREAM_CLOSED
Definition h2.h:63
int h2_stream_pri_cmp_fn(int stream_id1, int stream_id2, void *session)
Definition h2.h:194
#define H2MIN(x, y)
Definition h2.h:101
#define H2_ERR_INTERNAL_ERROR
Definition h2.h:60
void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c)
void h2_beam_on_received(h2_bucket_beam *beam, h2_beam_ev_callback *recv_cb, void *ctx)
void h2_beam_on_eagain(h2_bucket_beam *beam, h2_beam_ev_callback *eagain_cb, void *ctx)
apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c)
void h2_beam_on_was_empty(h2_bucket_beam *beam, h2_beam_ev_callback *was_empty_cb, void *ctx)
int h2_beam_empty(h2_bucket_beam *beam)
apr_status_t h2_beam_create(h2_bucket_beam **pbeam, conn_rec *from, apr_pool_t *pool, int id, const char *tag, apr_size_t max_buf_size, apr_interval_time_t timeout)
void h2_beam_on_send(h2_bucket_beam *beam, h2_beam_ev_callback *send_cb, void *ctx)
int h2_beam_is_complete(h2_bucket_beam *beam)
void h2_beam_on_consumed(h2_bucket_beam *beam, h2_beam_io_callback *io_cb, void *ctx)
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
static struct h2_workers * workers
Definition h2_c1.c:48
void h2_c2_abort(conn_rec *c2, conn_rec *from)
Definition h2_c2.c:157
conn_rec * h2_c2_create(conn_rec *c1, apr_pool_t *parent, apr_bucket_alloc_t *buckt_alloc)
Definition h2_c2.c:808
void h2_c2_destroy(conn_rec *c2)
Definition h2_c2.c:145
int h2_config_sgeti(server_rec *s, h2_config_var_t var)
Definition h2_config.c:506
@ H2_CONF_STREAM_MAX_MEM
Definition h2_config.h:32
@ H2_CONF_MAX_STREAMS
Definition h2_config.h:27
apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2, struct h2_mplx *mplx, struct h2_stream *stream, struct h2_c2_transit *transit)
Definition h2_conn_ctx.c:77
#define H2_PIPE_OUT
Definition h2_conn_ctx.h:29
#define h2_conn_ctx_get(c)
Definition h2_conn_ctx.h:78
#define H2_PIPE_IN
Definition h2_conn_ctx.h:30
apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
Definition h2_mplx.c:83
static void c1_purge_streams(h2_mplx *m)
Definition h2_mplx.c:574
static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
Definition h2_mplx.c:811
static apr_status_t c1_process_stream(h2_mplx *m, h2_stream *stream, h2_stream_pri_cmp_fn *cmp, h2_session *session)
Definition h2_mplx.c:664
static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
Definition h2_mplx.c:766
static int m_unexpected_stream_iter(void *ctx, void *val)
Definition h2_mplx.c:446
int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
Definition h2_mplx.c:118
static apr_pool_t * pchild
Definition h2_mplx.c:74
#define H2_MPLX_ENTER_ALWAYS(m)
Definition h2_mplx.c:97
static void c2_prod_done(void *baton, conn_rec *c2)
Definition h2_mplx.c:1011
static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx)
Definition h2_mplx.c:1046
apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, h2_stream *stream, unsigned int *pstream_count)
Definition h2_mplx.c:549
static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
Definition h2_mplx.c:136
static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
Definition h2_mplx.c:106
static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
Definition h2_mplx.c:938
apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id, h2_stream *stream)
Definition h2_mplx.c:1122
apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp, h2_session *session)
Definition h2_mplx.c:643
static void workers_shutdown(void *baton, int graceful)
Definition h2_mplx.c:1026
static int abort_on_oom(int retcode)
Definition h2_mplx.c:77
void h2_mplx_c1_going_keepalive(h2_mplx *m)
Definition h2_mplx.c:605
static conn_rec * c2_prod_next(void *baton, int *phas_more)
Definition h2_mplx.c:924
static int m_stream_cancel_iter(void *ctx, void *val)
Definition h2_mplx.c:455
int h2_mplx_c1_shutdown(h2_mplx *m)
Definition h2_mplx.c:358
static void c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam)
Definition h2_mplx.c:798
static int stream_is_running(h2_stream *stream)
Definition h2_mplx.c:111
static int m_report_stream_iter(void *ctx, void *val)
Definition h2_mplx.c:422
void h2_mplx_c1_destroy(h2_mplx *m)
Definition h2_mplx.c:471
static void add_stream_poll_event(h2_mplx *m, int stream_id, h2_iqueue *q)
Definition h2_mplx.c:777
static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
Definition h2_mplx.c:787
static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit)
Definition h2_mplx.c:236
static conn_rec * s_next_c2(h2_mplx *m)
Definition h2_mplx.c:870
static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit)
Definition h2_mplx.c:822
static int reset_is_acceptable(h2_stream *stream)
Definition h2_mplx.c:1100
static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, stream_ev_callback *on_stream_input, stream_ev_callback *on_stream_output, void *on_ctx)
Definition h2_mplx.c:1161
static int m_stream_iter_wrap(void *ctx, void *stream)
Definition h2_mplx.c:377
static h2_c2_transit * c2_transit_create(h2_mplx *m)
Definition h2_mplx.c:186
static apr_status_t mplx_pollset_create(h2_mplx *m)
Definition h2_mplx.c:1154
static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)
Definition h2_mplx.c:128
h2_mplx * h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0, server_rec *s, apr_pool_t *parent, h2_workers *workers)
Definition h2_mplx.c:258
int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m)
Definition h2_mplx.c:411
apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout, stream_ev_callback *on_stream_input, stream_ev_callback *on_stream_output, void *on_ctx)
Definition h2_mplx.c:614
#define H2_MPLX_LEAVE(m)
Definition h2_mplx.c:94
static h2_c2_transit * c2_transit_get(h2_mplx *m)
Definition h2_mplx.c:227
static void c2_transit_destroy(h2_c2_transit *transit)
Definition h2_mplx.c:222
#define H2_MPLX_ENTER(m)
Definition h2_mplx.c:89
static int m_stream_want_send_data(void *ctx, void *stream)
Definition h2_mplx.c:402
const h2_stream * h2_mplx_c2_stream_get(h2_mplx *m, int stream_id)
Definition h2_mplx.c:562
apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
Definition h2_mplx.c:383
void h2_mplx_c1_process(h2_mplx *m, h2_iqueue *ready_to_process, h2_stream_get_fn *get_stream, h2_stream_pri_cmp_fn *stream_pri_cmp, h2_session *session, unsigned int *pstream_count)
Definition h2_mplx.c:711
static void m_be_annoyed(h2_mplx *m)
Definition h2_mplx.c:1066
void stream_ev_callback(void *ctx, struct h2_stream *stream)
Definition h2_mplx.h:170
int h2_mplx_stream_cb(struct h2_stream *s, void *userdata)
Definition h2_mplx.h:184
#define H2_MPLX_MSG(m, msg)
Definition h2_mplx.h:227
static void transit(h2_proxy_session *session, const char *action, h2_proxys_state nstate)
static void on_stream_output(void *ctx, h2_stream *stream)
static void on_stream_input(void *ctx, h2_stream *stream)
static int stream_pri_cmp(int sid1, int sid2, void *ctx)
Definition h2_session.c:162
static h2_stream * get_stream(h2_session *session, int stream_id)
Definition h2_session.c:79
void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
Definition h2_stream.c:392
apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
Definition h2_stream.c:1303
void h2_stream_destroy(h2_stream *stream)
Definition h2_stream.c:628
void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
Definition h2_stream.c:387
void h2_stream_cleanup(h2_stream *stream)
Definition h2_stream.c:615
void h2_stream_rst(h2_stream *stream, int error_code)
Definition h2_stream.c:638
apr_status_t h2_stream_prepare_processing(h2_stream *stream)
Definition h2_stream.c:211
int h2_stream_is_ready(h2_stream *stream)
Definition h2_stream.c:1255
int h2_stream_wants_send_data(h2_stream *stream)
Definition h2_stream.c:1268
#define H2_STRM_MSG(s, msg)
Definition h2_stream.h:338
#define H2_STRM_LOG(aplogno, s, msg)
Definition h2_stream.h:342
void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx)
Definition h2_util.c:405
void h2_ihash_add(h2_ihash_t *ih, void *val)
Definition h2_util.c:268
int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
Definition h2_util.c:354
h2_iqueue * h2_iq_create(apr_pool_t *pool, int capacity)
Definition h2_util.c:334
void h2_util_drain_pipe(apr_file_t *pipe)
Definition h2_util.c:1882
void h2_ihash_remove(h2_ihash_t *ih, int id)
Definition h2_util.c:273
h2_ihash_t * h2_ihash_create(apr_pool_t *pool, size_t offset_of_int)
Definition h2_util.c:225
int h2_iq_empty(h2_iqueue *q)
Definition h2_util.c:343
int h2_iq_append(h2_iqueue *q, int sid)
Definition h2_util.c:375
void * h2_ihash_get(h2_ihash_t *ih, int id)
Definition h2_util.c:243
int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx)
Definition h2_util.c:260
int h2_iq_remove(h2_iqueue *q, int sid)
Definition h2_util.c:380
unsigned int h2_ihash_count(h2_ihash_t *ih)
Definition h2_util.c:233
void h2_iq_clear(h2_iqueue *q)
Definition h2_util.c:400
int h2_ihash_empty(h2_ihash_t *ih)
Definition h2_util.c:238
int h2_iq_shift(h2_iqueue *q)
Definition h2_util.c:433
int h2_iq_count(h2_iqueue *q)
Definition h2_util.c:348
ap_conn_producer_t * h2_workers_register(h2_workers *workers, apr_pool_t *producer_pool, const char *name, ap_conn_producer_next *fn_next, ap_conn_producer_done *fn_done, ap_conn_producer_shutdown *fn_shutdown, void *baton)
Definition h2_workers.c:563
apr_uint32_t h2_workers_get_max_workers(h2_workers *workers)
Definition h2_workers.c:538
apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod)
Definition h2_workers.c:611
apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod)
Definition h2_workers.c:589
Apache connection library.
CORE HTTP Daemon.
Apache Logging library.
HTTP protocol handling.
HTTP Daemon routines.
return NULL
Definition mod_so.c:359
int i
Definition mod_so.c:347
Multi-Processing Modules functions.
void * client_data
Definition apr_poll.h:114
Structure to store things which are per connection.
Definition httpd.h:1152
apr_pool_t * pool
Definition httpd.h:1154
server_rec * base_server
Definition httpd.h:1156
int keepalives
Definition httpd.h:1226
unsigned aborted
Definition httpd.h:1219
void * sbh
Definition httpd.h:1199
long id
Definition httpd.h:1187
struct h2_mplx * mplx
Definition h2_session.h:72
apr_pool_t * pool
Definition h2_session.h:71
conn_rec * c2
Definition h2_stream.h:118
struct h2_headers * response
Definition h2_stream.h:99
struct h2_bucket_beam * output
Definition h2_stream.h:107
struct h2_bucket_beam * input
Definition h2_stream.h:102
unsigned int scheduled
Definition h2_stream.h:112
h2_stream_state_t state
Definition h2_stream.h:86
apr_off_t out_data_frames
Definition h2_stream.h:123
const struct h2_request * request
Definition h2_stream.h:90
apr_pool_t * pool
Definition h2_workers.c:77
char * protocol
Definition httpd.h:879
const char * method
Definition httpd.h:900
A structure to store information for each virtual server.
Definition httpd.h:1322
apr_interval_time_t timeout
Definition httpd.h:1372
h2_mplx_stream_cb * cb
Definition h2_mplx.c:373
apr_time_t now
Definition h2_mplx.c:57
h2_mplx * m
Definition h2_mplx.c:55
h2_stream * stream
Definition h2_mplx.c:56
apr_size_t count
Definition h2_mplx.c:58
static apr_time_t now
Definition testtime.c:33
IN ULONG IN INT timeout