Apache HTTPD
apr_queue.c
Go to the documentation of this file.
1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "apr.h"
18
19#if APR_HAVE_STDIO_H
20#include <stdio.h>
21#endif
22#if APR_HAVE_STDLIB_H
23#include <stdlib.h>
24#endif
25#if APR_HAVE_UNISTD_H
26#include <unistd.h>
27#endif
28
29#include "apu.h"
30#include "apr_portable.h"
31#include "apr_thread_mutex.h"
32#include "apr_thread_cond.h"
33#include "apr_errno.h"
34#include "apr_queue.h"
35
36#if APR_HAS_THREADS
37/*
38 * define this to get debug messages
39 *
40#define QUEUE_DEBUG
41 */
42
43struct apr_queue_t {
44 void **data;
45 unsigned int nelts;
46 unsigned int in;
47 unsigned int out;
48 unsigned int bounds;
49 unsigned int full_waiters;
50 unsigned int empty_waiters;
52 apr_thread_cond_t *not_empty;
53 apr_thread_cond_t *not_full;
54 int terminated;
55};
56
57#ifdef QUEUE_DEBUG
58static void Q_DBG(char*msg, apr_queue_t *q) {
59 fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n",
61 q->nelts, q->in, q->out,
62 msg
63 );
64}
65#else
66#define Q_DBG(x,y)
67#endif
68
73#define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
74
79#define apr_queue_empty(queue) ((queue)->nelts == 0)
80
85static apr_status_t queue_destroy(void *data)
86{
88
89 /* Ignore errors here, we can't do anything about them anyway. */
90
93 apr_thread_mutex_destroy(queue->one_big_mutex);
94
95 return APR_SUCCESS;
96}
97
102 unsigned int queue_capacity,
103 apr_pool_t *a)
104{
105 apr_status_t rv;
107 queue = apr_palloc(a, sizeof(apr_queue_t));
108 *q = queue;
109
110 /* nested doesn't work ;( */
111 rv = apr_thread_mutex_create(&queue->one_big_mutex,
113 a);
114 if (rv != APR_SUCCESS) {
115 return rv;
116 }
117
118 rv = apr_thread_cond_create(&queue->not_empty, a);
119 if (rv != APR_SUCCESS) {
120 return rv;
121 }
122
123 rv = apr_thread_cond_create(&queue->not_full, a);
124 if (rv != APR_SUCCESS) {
125 return rv;
126 }
127
128 /* Set all the data in the queue to NULL */
129 queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
130 queue->bounds = queue_capacity;
131 queue->nelts = 0;
132 queue->in = 0;
133 queue->out = 0;
134 queue->terminated = 0;
135 queue->full_waiters = 0;
136 queue->empty_waiters = 0;
137
139
140 return APR_SUCCESS;
141}
142
149{
150 apr_status_t rv;
151
152 if (queue->terminated) {
153 return APR_EOF; /* no more elements ever again */
154 }
155
156 rv = apr_thread_mutex_lock(queue->one_big_mutex);
157 if (rv != APR_SUCCESS) {
158 return rv;
159 }
160
161 if (apr_queue_full(queue)) {
162 if (!queue->terminated) {
163 queue->full_waiters++;
164 rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
165 queue->full_waiters--;
166 if (rv != APR_SUCCESS) {
167 apr_thread_mutex_unlock(queue->one_big_mutex);
168 return rv;
169 }
170 }
171 /* If we wake up and it's still empty, then we were interrupted */
172 if (apr_queue_full(queue)) {
173 Q_DBG("queue full (intr)", queue);
174 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
175 if (rv != APR_SUCCESS) {
176 return rv;
177 }
178 if (queue->terminated) {
179 return APR_EOF; /* no more elements ever again */
180 }
181 else {
182 return APR_EINTR;
183 }
184 }
185 }
186
187 queue->data[queue->in] = data;
188 queue->in++;
189 if (queue->in >= queue->bounds)
190 queue->in -= queue->bounds;
191 queue->nelts++;
192
193 if (queue->empty_waiters) {
194 Q_DBG("sig !empty", queue);
195 rv = apr_thread_cond_signal(queue->not_empty);
196 if (rv != APR_SUCCESS) {
197 apr_thread_mutex_unlock(queue->one_big_mutex);
198 return rv;
199 }
200 }
201
202 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
203 return rv;
204}
205
212{
213 apr_status_t rv;
214
215 if (queue->terminated) {
216 return APR_EOF; /* no more elements ever again */
217 }
218
219 rv = apr_thread_mutex_lock(queue->one_big_mutex);
220 if (rv != APR_SUCCESS) {
221 return rv;
222 }
223
224 if (apr_queue_full(queue)) {
225 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
226 return APR_EAGAIN;
227 }
228
229 queue->data[queue->in] = data;
230 queue->in++;
231 if (queue->in >= queue->bounds)
232 queue->in -= queue->bounds;
233 queue->nelts++;
234
235 if (queue->empty_waiters) {
236 Q_DBG("sig !empty", queue);
237 rv = apr_thread_cond_signal(queue->not_empty);
238 if (rv != APR_SUCCESS) {
239 apr_thread_mutex_unlock(queue->one_big_mutex);
240 return rv;
241 }
242 }
243
244 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
245 return rv;
246}
247
252 return queue->nelts;
253}
254
262{
263 apr_status_t rv;
264
265 if (queue->terminated) {
266 return APR_EOF; /* no more elements ever again */
267 }
268
269 rv = apr_thread_mutex_lock(queue->one_big_mutex);
270 if (rv != APR_SUCCESS) {
271 return rv;
272 }
273
274 /* Keep waiting until we wake up and find that the queue is not empty. */
275 if (apr_queue_empty(queue)) {
276 if (!queue->terminated) {
277 queue->empty_waiters++;
278 rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
279 queue->empty_waiters--;
280 if (rv != APR_SUCCESS) {
281 apr_thread_mutex_unlock(queue->one_big_mutex);
282 return rv;
283 }
284 }
285 /* If we wake up and it's still empty, then we were interrupted */
286 if (apr_queue_empty(queue)) {
287 Q_DBG("queue empty (intr)", queue);
288 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
289 if (rv != APR_SUCCESS) {
290 return rv;
291 }
292 if (queue->terminated) {
293 return APR_EOF; /* no more elements ever again */
294 }
295 else {
296 return APR_EINTR;
297 }
298 }
299 }
300
301 *data = queue->data[queue->out];
302 queue->nelts--;
303
304 queue->out++;
305 if (queue->out >= queue->bounds)
306 queue->out -= queue->bounds;
307 if (queue->full_waiters) {
308 Q_DBG("signal !full", queue);
309 rv = apr_thread_cond_signal(queue->not_full);
310 if (rv != APR_SUCCESS) {
311 apr_thread_mutex_unlock(queue->one_big_mutex);
312 return rv;
313 }
314 }
315
316 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
317 return rv;
318}
319
326{
327 apr_status_t rv;
328
329 if (queue->terminated) {
330 return APR_EOF; /* no more elements ever again */
331 }
332
333 rv = apr_thread_mutex_lock(queue->one_big_mutex);
334 if (rv != APR_SUCCESS) {
335 return rv;
336 }
337
338 if (apr_queue_empty(queue)) {
339 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
340 return APR_EAGAIN;
341 }
342
343 *data = queue->data[queue->out];
344 queue->nelts--;
345
346 queue->out++;
347 if (queue->out >= queue->bounds)
348 queue->out -= queue->bounds;
349 if (queue->full_waiters) {
350 Q_DBG("signal !full", queue);
351 rv = apr_thread_cond_signal(queue->not_full);
352 if (rv != APR_SUCCESS) {
353 apr_thread_mutex_unlock(queue->one_big_mutex);
354 return rv;
355 }
356 }
357
358 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
359 return rv;
360}
361
363{
364 apr_status_t rv;
365 Q_DBG("intr all", queue);
366 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
367 return rv;
368 }
371
372 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
373 return rv;
374 }
375
376 return APR_SUCCESS;
377}
378
380{
381 apr_status_t rv;
382
383 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
384 return rv;
385 }
386
387 /* we must hold one_big_mutex when setting this... otherwise,
388 * we could end up setting it and waking everybody up just after a
389 * would-be popper checks it but right before they block
390 */
391 queue->terminated = 1;
392 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
393 return rv;
394 }
396}
397
398#endif /* APR_HAS_THREADS */
APR Error Codes.
APR Portability Routines.
Thread Safe FIFO bounded queue.
APU_DECLARE(void)
Computes SipHash-2-4, producing a 64bit (APR_SIPHASH_DSIZE) hash from a message and a 128bit (APR_SIP...
Definition apr_sha1.c:206
APR Condition Variable Routines.
APR Thread Mutex Routines.
#define APR_EAGAIN
Definition apr_errno.h:730
#define APR_EOF
Definition apr_errno.h:461
#define APR_EINTR
Definition apr_errno.h:737
apr_bucket apr_bucket_brigade * a
apr_size_t size
#define APR_SUCCESS
Definition apr_errno.h:225
int apr_status_t
Definition apr_errno.h:44
void * data
#define apr_pcalloc(p, size)
Definition apr_pools.h:465
int nelts
Definition apr_tables.h:122
apr_int32_t in
static apr_file_t * out
Definition mod_info.c:85
apr_os_thread_t apr_os_thread_current()
Definition thread.c:142