89#define H2_MPLX_ENTER(m) \
90 do { apr_status_t rv_lock; if ((rv_lock = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
94#define H2_MPLX_LEAVE(m) \
95 apr_thread_mutex_unlock(m->lock)
97#define H2_MPLX_ENTER_ALWAYS(m) \
98 apr_thread_mutex_lock(m->lock)
100#define H2_MPLX_ENTER_MAYBE(m, dolock) \
101 if (dolock) apr_thread_mutex_lock(m->lock)
103#define H2_MPLX_LEAVE_MAYBE(m, dolock) \
104 if (dolock) apr_thread_mutex_unlock(m->lock)
141 H2_STRM_MSG(stream,
"cleanup, unsubscribing from beam events"));
155 H2_STRM_MSG(stream,
"cleanup, removing from registries"));
164 H2_STRM_MSG(stream,
"cleanup, c2 is done, move to spurge"));
170 H2_STRM_MSG(stream,
"cleanup, c2 is running, abort"));
174 H2_STRM_MSG(stream,
"cleanup, c2 is done, move to shold"));
181 H2_STRM_MSG(stream,
"cleanup, never started, move to spurge"));
207 APLOGNO(10004)
"h2_mplx: create transit pool");
269 m->stream0 = stream0;
272 m->child_num = child_num;
313 m->processing_limit = 6;
320 "nghttp2: could not create pollset");
337 m->max_spare_transits = 3;
343 (
unsigned int)
m->id),
360 int max_stream_id_started = 0;
364 max_stream_id_started =
m->max_stream_id_started;
369 return max_stream_id_started;
380 return x->
cb(stream, x->
ctx);
427 H2_STRM_MSG(stream,
"started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
433 "[started=%u/done=%u]"),
450 H2_STRM_MSG(stream,
"unexpected, started=%d, scheduled=%d, ready=%d"),
481 m->shutdown =
m->aborted = 1;
494 H2_MPLX_MSG(
m,
"release, %u/%u/%d streams (total/hold/purge), %d streams"),
497 m->spurge->nelts,
m->processing_count);
579 for (
i = 0;
i <
m->spurge->nelts; ++
i) {
608 if (
m->spurge->nelts) {
633 if (
m->spurge->nelts) {
686 r->scheme?
"://" :
"",
687 r->authority,
r->path?
r->path:
"");
737 if ((
m->processing_count <
m->processing_limit) && !
h2_iq_empty(
m->q)) {
757 H2_MPLX_MSG(
m,
"child mem=%ld, mplx mem=%ld, session mem=%ld, c1=%ld"),
794 conn_ctx->mplx->streams_input_read);
818 conn_ctx->mplx->streams_output_written);
826 const char *action =
"init";
832 action =
"create output beam";
848 action =
"create input write pipe";
865 "error %s"), action);
878 while (!
m->aborted && !stream && (
m->processing_count <
m->processing_limit)
884 if (
m->processing_count >=
m->processing_limit && !
h2_iq_empty(
m->q)) {
887 "Current limit is %d and %d workers are in use."),
888 m->processing_limit,
m->processing_count);
893 if (
sid >
m->max_stream_id_started) {
894 m->max_stream_id_started =
sid;
898#if AP_HAS_RESPONSE_BUCKETS
911 ++
m->processing_count;
952 c2->
sbh =
m->c1->sbh;
955 "h2_mplx(%s-%d): request done, %f ms elapsed",
959 if (!
conn_ctx->has_final_response) {
961 "h2_c2(%s-%d): processing finished without final response",
969 "h2_c2(%s-%d): processing finished with incomplete output",
995 for (
i = 0;
i <
m->spurge->nelts; ++
i) {
1005 "h2_mplx(%s-%d): c2_done, stream not found",
1019 --
m->processing_count;
1050 if (
m->processing_limit <
m->processing_max
1051 &&
conn_ctx->started_at >
m->last_mood_change) {
1052 --
m->irritations_since;
1053 if (
m->processing_limit <
m->processing_max
1055 ||
m->irritations_since < -
m->processing_limit)) {
1056 m->processing_limit =
H2MIN(
m->processing_limit * 2,
m->processing_max);
1057 m->last_mood_change =
now;
1058 m->irritations_since = 0;
1060 H2_MPLX_MSG(
m,
"mood update, increasing worker limit to %d"),
1061 m->processing_limit);
1070 if (
m->processing_limit > 2) {
1071 ++
m->irritations_since;
1073 || (
m->irritations_since >=
m->processing_limit)) {
1075 if (
m->processing_limit > 16) {
1076 m->processing_limit = 16;
1078 else if (
m->processing_limit > 8) {
1079 m->processing_limit = 8;
1081 else if (
m->processing_limit > 4) {
1082 m->processing_limit = 4;
1084 else if (
m->processing_limit > 2) {
1085 m->processing_limit = 2;
1087 m->last_mood_change =
now;
1088 m->irritations_since = 0;
1090 H2_MPLX_MSG(
m,
"mood update, decreasing worker limit to %d"),
1091 m->processing_limit);
1116 if (!(stream->
id & 0x01))
return 1;
1133 H2_MPLX_MSG(
m,
"RST on unknown stream %d"), stream_id);
1245 for (
i = 0;
i <
m->streams_ev_in->nelts; ++
i) {
1252 for (
i = 0;
i <
m->streams_ev_out->nelts; ++
i) {
APR Condition Variable Routines.
APR Thread Mutex Routines.
void apr_atomic_set32(volatile apr_uint32_t *mem, apr_uint32_t val)
apr_uint32_t apr_atomic_read32(volatile apr_uint32_t *mem)
ap_vhost_iterate_conn_cb void * baton
apr_uint32_t ap_max_mem_free
#define APR_STATUS_IS_EINTR(s)
#define APR_STATUS_IS_TIMEUP(s)
apr_brigade_flush void * ctx
#define AP_DEBUG_ASSERT(exp)
void ap_abort_on_oom(void) __attribute__((noreturn))
apr_uint32_t apr_uint32_t cmp
const char int apr_pool_t * pool
void const char apr_status_t(* cleanup)(void *))
apr_vformatter_buff_t * c
const char apr_uint32_t * id
apr_abortfunc_t apr_allocator_t * allocator
#define apr_pcalloc(p, size)
#define APR_ARRAY_PUSH(ary, type)
#define APR_ARRAY_IDX(ary, i, type)
#define apr_time_from_msec(msec)
apr_int64_t apr_interval_time_t
#define apr_time_sec(time)
#define apr_time_from_sec(sec)
#define APR_POLLSET_WAKEABLE
struct h2_stream * h2_stream_get_fn(struct h2_session *session, int stream_id)
#define H2_ERR_STREAM_CLOSED
int h2_stream_pri_cmp_fn(int stream_id1, int stream_id2, void *session)
#define H2_ERR_INTERNAL_ERROR
void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c)
void h2_beam_on_received(h2_bucket_beam *beam, h2_beam_ev_callback *recv_cb, void *ctx)
void h2_beam_on_eagain(h2_bucket_beam *beam, h2_beam_ev_callback *eagain_cb, void *ctx)
apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c)
void h2_beam_on_was_empty(h2_bucket_beam *beam, h2_beam_ev_callback *was_empty_cb, void *ctx)
int h2_beam_empty(h2_bucket_beam *beam)
apr_status_t h2_beam_create(h2_bucket_beam **pbeam, conn_rec *from, apr_pool_t *pool, int id, const char *tag, apr_size_t max_buf_size, apr_interval_time_t timeout)
void h2_beam_on_send(h2_bucket_beam *beam, h2_beam_ev_callback *send_cb, void *ctx)
int h2_beam_is_complete(h2_bucket_beam *beam)
void h2_beam_on_consumed(h2_bucket_beam *beam, h2_beam_io_callback *io_cb, void *ctx)
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
static struct h2_workers * workers
void h2_c2_abort(conn_rec *c2, conn_rec *from)
conn_rec * h2_c2_create(conn_rec *c1, apr_pool_t *parent, apr_bucket_alloc_t *buckt_alloc)
void h2_c2_destroy(conn_rec *c2)
int h2_config_sgeti(server_rec *s, h2_config_var_t var)
apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2, struct h2_mplx *mplx, struct h2_stream *stream, struct h2_c2_transit *transit)
#define h2_conn_ctx_get(c)
apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
static void c1_purge_streams(h2_mplx *m)
static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
static apr_status_t c1_process_stream(h2_mplx *m, h2_stream *stream, h2_stream_pri_cmp_fn *cmp, h2_session *session)
static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
static int m_unexpected_stream_iter(void *ctx, void *val)
int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
static apr_pool_t * pchild
#define H2_MPLX_ENTER_ALWAYS(m)
static void c2_prod_done(void *baton, conn_rec *c2)
static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx)
apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, h2_stream *stream, unsigned int *pstream_count)
static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id, h2_stream *stream)
apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp, h2_session *session)
static void workers_shutdown(void *baton, int graceful)
static int abort_on_oom(int retcode)
void h2_mplx_c1_going_keepalive(h2_mplx *m)
static conn_rec * c2_prod_next(void *baton, int *phas_more)
static int m_stream_cancel_iter(void *ctx, void *val)
int h2_mplx_c1_shutdown(h2_mplx *m)
static void c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam)
static int stream_is_running(h2_stream *stream)
static int m_report_stream_iter(void *ctx, void *val)
void h2_mplx_c1_destroy(h2_mplx *m)
static void add_stream_poll_event(h2_mplx *m, int stream_id, h2_iqueue *q)
static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit)
static conn_rec * s_next_c2(h2_mplx *m)
static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit)
static int reset_is_acceptable(h2_stream *stream)
static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, stream_ev_callback *on_stream_input, stream_ev_callback *on_stream_output, void *on_ctx)
static int m_stream_iter_wrap(void *ctx, void *stream)
static h2_c2_transit * c2_transit_create(h2_mplx *m)
static apr_status_t mplx_pollset_create(h2_mplx *m)
static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)
h2_mplx * h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0, server_rec *s, apr_pool_t *parent, h2_workers *workers)
int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m)
apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout, stream_ev_callback *on_stream_input, stream_ev_callback *on_stream_output, void *on_ctx)
static h2_c2_transit * c2_transit_get(h2_mplx *m)
static void c2_transit_destroy(h2_c2_transit *transit)
static int m_stream_want_send_data(void *ctx, void *stream)
const h2_stream * h2_mplx_c2_stream_get(h2_mplx *m, int stream_id)
apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
void h2_mplx_c1_process(h2_mplx *m, h2_iqueue *ready_to_process, h2_stream_get_fn *get_stream, h2_stream_pri_cmp_fn *stream_pri_cmp, h2_session *session, unsigned int *pstream_count)
static void m_be_annoyed(h2_mplx *m)
void stream_ev_callback(void *ctx, struct h2_stream *stream)
int h2_mplx_stream_cb(struct h2_stream *s, void *userdata)
#define H2_MPLX_MSG(m, msg)
static void transit(h2_proxy_session *session, const char *action, h2_proxys_state nstate)
static void on_stream_output(void *ctx, h2_stream *stream)
static void on_stream_input(void *ctx, h2_stream *stream)
static int stream_pri_cmp(int sid1, int sid2, void *ctx)
static h2_stream * get_stream(h2_session *session, int stream_id)
void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
void h2_stream_destroy(h2_stream *stream)
void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
void h2_stream_cleanup(h2_stream *stream)
void h2_stream_rst(h2_stream *stream, int error_code)
apr_status_t h2_stream_prepare_processing(h2_stream *stream)
int h2_stream_is_ready(h2_stream *stream)
int h2_stream_wants_send_data(h2_stream *stream)
#define H2_STRM_MSG(s, msg)
#define H2_STRM_LOG(aplogno, s, msg)
void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx)
void h2_ihash_add(h2_ihash_t *ih, void *val)
int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
h2_iqueue * h2_iq_create(apr_pool_t *pool, int capacity)
void h2_util_drain_pipe(apr_file_t *pipe)
void h2_ihash_remove(h2_ihash_t *ih, int id)
h2_ihash_t * h2_ihash_create(apr_pool_t *pool, size_t offset_of_int)
int h2_iq_empty(h2_iqueue *q)
int h2_iq_append(h2_iqueue *q, int sid)
void * h2_ihash_get(h2_ihash_t *ih, int id)
int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx)
int h2_iq_remove(h2_iqueue *q, int sid)
unsigned int h2_ihash_count(h2_ihash_t *ih)
void h2_iq_clear(h2_iqueue *q)
int h2_ihash_empty(h2_ihash_t *ih)
int h2_iq_shift(h2_iqueue *q)
int h2_iq_count(h2_iqueue *q)
ap_conn_producer_t * h2_workers_register(h2_workers *workers, apr_pool_t *producer_pool, const char *name, ap_conn_producer_next *fn_next, ap_conn_producer_done *fn_done, ap_conn_producer_shutdown *fn_shutdown, void *baton)
apr_uint32_t h2_workers_get_max_workers(h2_workers *workers)
apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod)
apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod)
Apache connection library.
Multi-Processing Modules functions.
Structure to store things which are per connection.
struct h2_headers * response
struct h2_bucket_beam * output
struct h2_bucket_beam * input
apr_off_t out_data_frames
const struct h2_request * request
A structure to store information for each virtual server.
apr_interval_time_t timeout