Branch data Line data Source code
1 : : /***
2 : : This file is part of PulseAudio.
3 : :
4 : : Copyright 2006 Lennart Poettering
5 : :
6 : : PulseAudio is free software; you can redistribute it and/or modify
7 : : it under the terms of the GNU Lesser General Public License as
8 : : published by the Free Software Foundation; either version 2.1 of the
9 : : License, or (at your option) any later version.
10 : :
11 : : PulseAudio is distributed in the hope that it will be useful, but
12 : : WITHOUT ANY WARRANTY; without even the implied warranty of
13 : : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : : Lesser General Public License for more details.
15 : :
16 : : You should have received a copy of the GNU Lesser General Public
17 : : License along with PulseAudio; if not, write to the Free Software
18 : : Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 : : USA.
20 : : ***/
21 : :
22 : : #ifdef HAVE_CONFIG_H
23 : : #include <config.h>
24 : : #endif
25 : :
26 : : #include <unistd.h>
27 : : #include <errno.h>
28 : :
29 : : #include <pulse/xmalloc.h>
30 : :
31 : : #include <pulsecore/macro.h>
32 : : #include <pulsecore/log.h>
33 : : #include <pulsecore/semaphore.h>
34 : : #include <pulsecore/macro.h>
35 : : #include <pulsecore/mutex.h>
36 : : #include <pulsecore/flist.h>
37 : :
38 : : #include "asyncmsgq.h"
39 : :
40 [ - + ][ # # ]: 28 : PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
41 [ - + ][ # # ]: 24 : PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
42 : :
43 : : struct asyncmsgq_item {
44 : : int code;
45 : : pa_msgobject *object;
46 : : void *userdata;
47 : : pa_free_cb_t free_cb;
48 : : int64_t offset;
49 : : pa_memchunk memchunk;
50 : : pa_semaphore *semaphore;
51 : : int ret;
52 : : };
53 : :
54 : : struct pa_asyncmsgq {
55 : : PA_REFCNT_DECLARE;
56 : : pa_asyncq *asyncq;
57 : : pa_mutex *mutex; /* only for the writer side */
58 : :
59 : : struct asyncmsgq_item *current;
60 : : };
61 : :
62 : 1 : pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
63 : : pa_asyncmsgq *a;
64 : :
65 : 1 : a = pa_xnew(pa_asyncmsgq, 1);
66 : :
67 : 1 : PA_REFCNT_INIT(a);
68 [ - + ]: 1 : pa_assert_se(a->asyncq = pa_asyncq_new(size));
69 [ - + ]: 1 : pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
70 : 1 : a->current = NULL;
71 : :
72 : 1 : return a;
73 : : }
74 : :
75 : 1 : static void asyncmsgq_free(pa_asyncmsgq *a) {
76 : : struct asyncmsgq_item *i;
77 [ + - ]: 1 : pa_assert(a);
78 : :
79 [ - + ]: 1 : while ((i = pa_asyncq_pop(a->asyncq, FALSE))) {
80 : :
81 [ # # ]: 0 : pa_assert(!i->semaphore);
82 : :
83 [ # # ]: 0 : if (i->object)
84 : 0 : pa_msgobject_unref(i->object);
85 : :
86 [ # # ]: 0 : if (i->memchunk.memblock)
87 : 0 : pa_memblock_unref(i->memchunk.memblock);
88 : :
89 [ # # ]: 0 : if (i->free_cb)
90 : 0 : i->free_cb(i->userdata);
91 : :
92 [ # # ]: 0 : if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
93 : 1 : pa_xfree(i);
94 : : }
95 : :
96 : 1 : pa_asyncq_free(a->asyncq, NULL);
97 : 1 : pa_mutex_free(a->mutex);
98 : 1 : pa_xfree(a);
99 : 1 : }
100 : :
101 : 0 : pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
102 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(q) > 0);
103 : :
104 : 0 : PA_REFCNT_INC(q);
105 : 0 : return q;
106 : : }
107 : :
108 : 1 : void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
109 [ - + ]: 1 : pa_assert(PA_REFCNT_VALUE(q) > 0);
110 : :
111 [ + - ]: 1 : if (PA_REFCNT_DEC(q) <= 0)
112 : 1 : asyncmsgq_free(q);
113 : 1 : }
114 : :
115 : 3 : void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
116 : : struct asyncmsgq_item *i;
117 [ - + ]: 3 : pa_assert(PA_REFCNT_VALUE(a) > 0);
118 : :
119 [ + + ]: 3 : if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
120 : 2 : i = pa_xnew(struct asyncmsgq_item, 1);
121 : :
122 : 3 : i->code = code;
123 [ - + ]: 3 : i->object = object ? pa_msgobject_ref(object) : NULL;
124 : 3 : i->userdata = (void*) userdata;
125 : 3 : i->free_cb = free_cb;
126 : 3 : i->offset = offset;
127 [ - + ]: 3 : if (chunk) {
128 [ # # ]: 0 : pa_assert(chunk->memblock);
129 : 0 : i->memchunk = *chunk;
130 : 0 : pa_memblock_ref(i->memchunk.memblock);
131 : : } else
132 : 3 : pa_memchunk_reset(&i->memchunk);
133 : 3 : i->semaphore = NULL;
134 : :
135 : : /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
136 : 3 : pa_mutex_lock(a->mutex);
137 : 3 : pa_asyncq_post(a->asyncq, i);
138 : 3 : pa_mutex_unlock(a->mutex);
139 : 3 : }
140 : :
141 : 1 : int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
142 : : struct asyncmsgq_item i;
143 [ - + ]: 1 : pa_assert(PA_REFCNT_VALUE(a) > 0);
144 : :
145 : 1 : i.code = code;
146 : 1 : i.object = object;
147 : 1 : i.userdata = (void*) userdata;
148 : 1 : i.free_cb = NULL;
149 : 1 : i.ret = -1;
150 : 1 : i.offset = offset;
151 [ - + ]: 1 : if (chunk) {
152 [ # # ]: 0 : pa_assert(chunk->memblock);
153 : 0 : i.memchunk = *chunk;
154 : : } else
155 : 1 : pa_memchunk_reset(&i.memchunk);
156 : :
157 [ + - ]: 1 : if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
158 : 1 : i.semaphore = pa_semaphore_new(0);
159 : :
160 [ - + ]: 1 : pa_assert_se(i.semaphore);
161 : :
162 : : /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
163 : 1 : pa_mutex_lock(a->mutex);
164 [ - + ]: 1 : pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
165 : 1 : pa_mutex_unlock(a->mutex);
166 : :
167 : 1 : pa_semaphore_wait(i.semaphore);
168 : :
169 [ - + ]: 1 : if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
170 : 0 : pa_semaphore_free(i.semaphore);
171 : :
172 : 1 : return i.ret;
173 : : }
174 : :
175 : 4 : int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait_op) {
176 [ - + ]: 4 : pa_assert(PA_REFCNT_VALUE(a) > 0);
177 [ - + ]: 4 : pa_assert(!a->current);
178 : :
179 [ + - ]: 4 : if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
180 : : /* pa_log("failure"); */
181 : : return -1;
182 : : }
183 : :
184 : : /* pa_log("success"); */
185 : :
186 [ + - ]: 4 : if (code)
187 : 4 : *code = a->current->code;
188 [ - + ]: 4 : if (userdata)
189 : 0 : *userdata = a->current->userdata;
190 [ - + ]: 4 : if (offset)
191 : 0 : *offset = a->current->offset;
192 [ - + ]: 4 : if (object) {
193 [ # # ]: 0 : if ((*object = a->current->object))
194 : 0 : pa_msgobject_assert_ref(*object);
195 : : }
196 [ - + ]: 4 : if (chunk)
197 : 4 : *chunk = a->current->memchunk;
198 : :
199 : : /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
200 : : /* (void*) a, */
201 : : /* (void*) a->current->object, */
202 : : /* a->current->object ? a->current->object->parent.type_name : NULL, */
203 : : /* a->current->code, */
204 : : /* (void*) a->current->userdata, */
205 : : /* (unsigned long) a->current->memchunk.length); */
206 : :
207 : : return 0;
208 : : }
209 : :
210 : 4 : void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
211 [ - + ]: 4 : pa_assert(PA_REFCNT_VALUE(a) > 0);
212 [ - + ]: 4 : pa_assert(a);
213 [ - + ]: 4 : pa_assert(a->current);
214 : :
215 [ + + ]: 4 : if (a->current->semaphore) {
216 : 1 : a->current->ret = ret;
217 : 1 : pa_semaphore_post(a->current->semaphore);
218 : : } else {
219 : :
220 [ - + ]: 3 : if (a->current->free_cb)
221 : 0 : a->current->free_cb(a->current->userdata);
222 : :
223 [ - + ]: 3 : if (a->current->object)
224 : 0 : pa_msgobject_unref(a->current->object);
225 : :
226 [ - + ]: 3 : if (a->current->memchunk.memblock)
227 : 0 : pa_memblock_unref(a->current->memchunk.memblock);
228 : :
229 [ - + ]: 3 : if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
230 : 0 : pa_xfree(a->current);
231 : : }
232 : :
233 : 4 : a->current = NULL;
234 : 4 : }
235 : :
236 : 0 : int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
237 : : int c;
238 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
239 : :
240 : 0 : pa_asyncmsgq_ref(a);
241 : :
242 : : do {
243 : : pa_msgobject *o;
244 : : void *data;
245 : : int64_t offset;
246 : : pa_memchunk chunk;
247 : : int ret;
248 : :
249 [ # # ]: 0 : if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, TRUE) < 0)
250 : : return -1;
251 : :
252 : 0 : ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
253 : 0 : pa_asyncmsgq_done(a, ret);
254 : :
255 [ # # ]: 0 : } while (c != code);
256 : :
257 : 0 : pa_asyncmsgq_unref(a);
258 : :
259 : 0 : return 0;
260 : : }
261 : :
262 : 0 : int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
263 : : pa_msgobject *object;
264 : : int code;
265 : : void *data;
266 : : pa_memchunk chunk;
267 : : int64_t offset;
268 : : int ret;
269 : :
270 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
271 : :
272 [ # # ]: 0 : if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
273 : : return 0;
274 : :
275 : 0 : pa_asyncmsgq_ref(a);
276 : 0 : ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
277 : 0 : pa_asyncmsgq_done(a, ret);
278 : 0 : pa_asyncmsgq_unref(a);
279 : :
280 : 0 : return 1;
281 : : }
282 : :
283 : 0 : int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
284 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
285 : :
286 : 0 : return pa_asyncq_read_fd(a->asyncq);
287 : : }
288 : :
289 : 0 : int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
290 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
291 : :
292 : 0 : return pa_asyncq_read_before_poll(a->asyncq);
293 : : }
294 : :
295 : 0 : void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
296 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
297 : :
298 : 0 : pa_asyncq_read_after_poll(a->asyncq);
299 : 0 : }
300 : :
301 : 0 : int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
302 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
303 : :
304 : 0 : return pa_asyncq_write_fd(a->asyncq);
305 : : }
306 : :
307 : 0 : void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
308 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
309 : :
310 : 0 : pa_asyncq_write_before_poll(a->asyncq);
311 : 0 : }
312 : :
313 : 0 : void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
314 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
315 : :
316 : 0 : pa_asyncq_write_after_poll(a->asyncq);
317 : 0 : }
318 : :
319 : 0 : int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
320 : :
321 [ # # ]: 0 : if (object)
322 [ # # ]: 0 : return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
323 : :
324 : : return 0;
325 : : }
326 : :
327 : 0 : void pa_asyncmsgq_flush(pa_asyncmsgq *a, pa_bool_t run) {
328 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
329 : :
330 : : for (;;) {
331 : : pa_msgobject *object;
332 : : int code;
333 : : void *data;
334 : : int64_t offset;
335 : : pa_memchunk chunk;
336 : : int ret;
337 : :
338 [ # # ]: 0 : if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
339 : 0 : return;
340 : :
341 [ # # ]: 0 : if (!run) {
342 : 0 : pa_asyncmsgq_done(a, -1);
343 : 0 : continue;
344 : : }
345 : :
346 : 0 : pa_asyncmsgq_ref(a);
347 : 0 : ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
348 : 0 : pa_asyncmsgq_done(a, ret);
349 : 0 : pa_asyncmsgq_unref(a);
350 : : }
351 : : }
352 : :
353 : 0 : pa_bool_t pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
354 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(a) > 0);
355 : :
356 : 0 : return !!a->current;
357 : : }
|