Branch data Line data Source code
1 : : /***
2 : : This file is part of PulseAudio.
3 : :
4 : : Copyright 2004-2006 Lennart Poettering
5 : : Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6 : :
7 : : PulseAudio is free software; you can redistribute it and/or modify
8 : : it under the terms of the GNU Lesser General Public License as
9 : : published by the Free Software Foundation; either version 2.1 of the
10 : : License, or (at your option) any later version.
11 : :
12 : : PulseAudio is distributed in the hope that it will be useful, but
13 : : WITHOUT ANY WARRANTY; without even the implied warranty of
14 : : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 : : Lesser General Public License for more details.
16 : :
17 : : You should have received a copy of the GNU Lesser General Public
18 : : License along with PulseAudio; if not, write to the Free Software
19 : : Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 : : USA.
21 : : ***/
22 : :
23 : : #ifdef HAVE_CONFIG_H
24 : : #include <config.h>
25 : : #endif
26 : :
27 : : #include <stdio.h>
28 : : #include <stdlib.h>
29 : : #include <unistd.h>
30 : :
31 : : #ifdef HAVE_NETINET_IN_H
32 : : #include <netinet/in.h>
33 : : #endif
34 : :
35 : : #include <pulse/xmalloc.h>
36 : :
37 : : #include <pulsecore/socket.h>
38 : : #include <pulsecore/queue.h>
39 : : #include <pulsecore/log.h>
40 : : #include <pulsecore/creds.h>
41 : : #include <pulsecore/refcnt.h>
42 : : #include <pulsecore/flist.h>
43 : : #include <pulsecore/macro.h>
44 : :
45 : : #include "pstream.h"
46 : :
47 : : /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
48 : : #define PA_FLAG_SHMDATA 0x80000000LU
49 : : #define PA_FLAG_SHMRELEASE 0x40000000LU
50 : : #define PA_FLAG_SHMREVOKE 0xC0000000LU
51 : : #define PA_FLAG_SHMMASK 0xFF000000LU
52 : : #define PA_FLAG_SEEKMASK 0x000000FFLU
53 : :
54 : : /* The sequence descriptor header consists of 5 32bit integers: */
55 : : enum {
56 : : PA_PSTREAM_DESCRIPTOR_LENGTH,
57 : : PA_PSTREAM_DESCRIPTOR_CHANNEL,
58 : : PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
59 : : PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
60 : : PA_PSTREAM_DESCRIPTOR_FLAGS,
61 : : PA_PSTREAM_DESCRIPTOR_MAX
62 : : };
63 : :
64 : : /* If we have an SHM block, this info follows the descriptor */
65 : : enum {
66 : : PA_PSTREAM_SHM_BLOCKID,
67 : : PA_PSTREAM_SHM_SHMID,
68 : : PA_PSTREAM_SHM_INDEX,
69 : : PA_PSTREAM_SHM_LENGTH,
70 : : PA_PSTREAM_SHM_MAX
71 : : };
72 : :
73 : : typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
74 : :
75 : : #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
76 : :
77 : : /* To allow uploading a single sample in one frame, this value should be the
78 : : * same size (16 MB) as PA_SCACHE_ENTRY_SIZE_MAX from pulsecore/core-scache.h.
79 : : */
80 : : #define FRAME_SIZE_MAX_ALLOW (1024*1024*16)
81 : :
82 [ - + ][ # # ]: 27 : PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
83 : :
84 : : struct item_info {
85 : : enum {
86 : : PA_PSTREAM_ITEM_PACKET,
87 : : PA_PSTREAM_ITEM_MEMBLOCK,
88 : : PA_PSTREAM_ITEM_SHMRELEASE,
89 : : PA_PSTREAM_ITEM_SHMREVOKE
90 : : } type;
91 : :
92 : : /* packet info */
93 : : pa_packet *packet;
94 : : #ifdef HAVE_CREDS
95 : : pa_bool_t with_creds;
96 : : pa_creds creds;
97 : : #endif
98 : :
99 : : /* memblock info */
100 : : pa_memchunk chunk;
101 : : uint32_t channel;
102 : : int64_t offset;
103 : : pa_seek_mode_t seek_mode;
104 : :
105 : : /* release/revoke info */
106 : : uint32_t block_id;
107 : : };
108 : :
109 : : struct pa_pstream {
110 : : PA_REFCNT_DECLARE;
111 : :
112 : : pa_mainloop_api *mainloop;
113 : : pa_defer_event *defer_event;
114 : : pa_iochannel *io;
115 : :
116 : : pa_queue *send_queue;
117 : :
118 : : pa_bool_t dead;
119 : :
120 : : struct {
121 : : pa_pstream_descriptor descriptor;
122 : : struct item_info* current;
123 : : uint32_t shm_info[PA_PSTREAM_SHM_MAX];
124 : : void *data;
125 : : size_t index;
126 : : pa_memchunk memchunk;
127 : : } write;
128 : :
129 : : struct {
130 : : pa_pstream_descriptor descriptor;
131 : : pa_memblock *memblock;
132 : : pa_packet *packet;
133 : : uint32_t shm_info[PA_PSTREAM_SHM_MAX];
134 : : void *data;
135 : : size_t index;
136 : : } read;
137 : :
138 : : pa_bool_t use_shm;
139 : : pa_memimport *import;
140 : : pa_memexport *export;
141 : :
142 : : pa_pstream_packet_cb_t receive_packet_callback;
143 : : void *receive_packet_callback_userdata;
144 : :
145 : : pa_pstream_memblock_cb_t receive_memblock_callback;
146 : : void *receive_memblock_callback_userdata;
147 : :
148 : : pa_pstream_notify_cb_t drain_callback;
149 : : void *drain_callback_userdata;
150 : :
151 : : pa_pstream_notify_cb_t die_callback;
152 : : void *die_callback_userdata;
153 : :
154 : : pa_pstream_block_id_cb_t revoke_callback;
155 : : void *revoke_callback_userdata;
156 : :
157 : : pa_pstream_block_id_cb_t release_callback;
158 : : void *release_callback_userdata;
159 : :
160 : : pa_mempool *mempool;
161 : :
162 : : #ifdef HAVE_CREDS
163 : : pa_creds read_creds, write_creds;
164 : : pa_bool_t read_creds_valid, send_creds_now;
165 : : #endif
166 : : };
167 : :
168 : : static int do_write(pa_pstream *p);
169 : : static int do_read(pa_pstream *p);
170 : :
171 : 0 : static void do_something(pa_pstream *p) {
172 [ # # ]: 0 : pa_assert(p);
173 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
174 : :
175 : 0 : pa_pstream_ref(p);
176 : :
177 : 0 : p->mainloop->defer_enable(p->defer_event, 0);
178 : :
179 [ # # ][ # # ]: 0 : if (!p->dead && pa_iochannel_is_readable(p->io)) {
180 [ # # ]: 0 : if (do_read(p) < 0)
181 : : goto fail;
182 [ # # ][ # # ]: 0 : } else if (!p->dead && pa_iochannel_is_hungup(p->io))
183 : : goto fail;
184 : :
185 [ # # ][ # # ]: 0 : if (!p->dead && pa_iochannel_is_writable(p->io)) {
186 [ # # ]: 0 : if (do_write(p) < 0)
187 : : goto fail;
188 : : }
189 : :
190 : 0 : pa_pstream_unref(p);
191 : 0 : return;
192 : :
193 : : fail:
194 : :
195 [ # # ]: 0 : if (p->die_callback)
196 : 0 : p->die_callback(p, p->die_callback_userdata);
197 : :
198 : 0 : pa_pstream_unlink(p);
199 : 0 : pa_pstream_unref(p);
200 : : }
201 : :
202 : 0 : static void io_callback(pa_iochannel*io, void *userdata) {
203 : 0 : pa_pstream *p = userdata;
204 : :
205 [ # # ]: 0 : pa_assert(p);
206 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
207 [ # # ]: 0 : pa_assert(p->io == io);
208 : :
209 : 0 : do_something(p);
210 : 0 : }
211 : :
212 : 0 : static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
213 : 0 : pa_pstream *p = userdata;
214 : :
215 [ # # ]: 0 : pa_assert(p);
216 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
217 [ # # ]: 0 : pa_assert(p->defer_event == e);
218 [ # # ]: 0 : pa_assert(p->mainloop == m);
219 : :
220 : 0 : do_something(p);
221 : 0 : }
222 : :
223 : : static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
224 : :
225 : 0 : pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
226 : : pa_pstream *p;
227 : :
228 [ # # ]: 0 : pa_assert(m);
229 [ # # ]: 0 : pa_assert(io);
230 [ # # ]: 0 : pa_assert(pool);
231 : :
232 : 0 : p = pa_xnew(pa_pstream, 1);
233 : 0 : PA_REFCNT_INIT(p);
234 : 0 : p->io = io;
235 : 0 : pa_iochannel_set_callback(io, io_callback, p);
236 : 0 : p->dead = FALSE;
237 : :
238 : 0 : p->mainloop = m;
239 : 0 : p->defer_event = m->defer_new(m, defer_callback, p);
240 : 0 : m->defer_enable(p->defer_event, 0);
241 : :
242 : 0 : p->send_queue = pa_queue_new();
243 : :
244 : 0 : p->write.current = NULL;
245 : 0 : p->write.index = 0;
246 : 0 : pa_memchunk_reset(&p->write.memchunk);
247 : 0 : p->read.memblock = NULL;
248 : 0 : p->read.packet = NULL;
249 : 0 : p->read.index = 0;
250 : :
251 : 0 : p->receive_packet_callback = NULL;
252 : 0 : p->receive_packet_callback_userdata = NULL;
253 : 0 : p->receive_memblock_callback = NULL;
254 : 0 : p->receive_memblock_callback_userdata = NULL;
255 : 0 : p->drain_callback = NULL;
256 : 0 : p->drain_callback_userdata = NULL;
257 : 0 : p->die_callback = NULL;
258 : 0 : p->die_callback_userdata = NULL;
259 : 0 : p->revoke_callback = NULL;
260 : 0 : p->revoke_callback_userdata = NULL;
261 : 0 : p->release_callback = NULL;
262 : 0 : p->release_callback_userdata = NULL;
263 : :
264 : 0 : p->mempool = pool;
265 : :
266 : 0 : p->use_shm = FALSE;
267 : 0 : p->export = NULL;
268 : :
269 : : /* We do importing unconditionally */
270 : 0 : p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
271 : :
272 : 0 : pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
273 : 0 : pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
274 : :
275 : : #ifdef HAVE_CREDS
276 : 0 : p->send_creds_now = FALSE;
277 : 0 : p->read_creds_valid = FALSE;
278 : : #endif
279 : 0 : return p;
280 : : }
281 : :
282 : 0 : static void item_free(void *item) {
283 : 0 : struct item_info *i = item;
284 [ # # ]: 0 : pa_assert(i);
285 : :
286 [ # # ]: 0 : if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
287 [ # # ]: 0 : pa_assert(i->chunk.memblock);
288 : 0 : pa_memblock_unref(i->chunk.memblock);
289 [ # # ]: 0 : } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
290 [ # # ]: 0 : pa_assert(i->packet);
291 : 0 : pa_packet_unref(i->packet);
292 : : }
293 : :
294 [ # # ]: 0 : if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
295 : 0 : pa_xfree(i);
296 : 0 : }
297 : :
298 : 0 : static void pstream_free(pa_pstream *p) {
299 [ # # ]: 0 : pa_assert(p);
300 : :
301 : 0 : pa_pstream_unlink(p);
302 : :
303 : 0 : pa_queue_free(p->send_queue, item_free);
304 : :
305 [ # # ]: 0 : if (p->write.current)
306 : 0 : item_free(p->write.current);
307 : :
308 [ # # ]: 0 : if (p->write.memchunk.memblock)
309 : 0 : pa_memblock_unref(p->write.memchunk.memblock);
310 : :
311 [ # # ]: 0 : if (p->read.memblock)
312 : 0 : pa_memblock_unref(p->read.memblock);
313 : :
314 [ # # ]: 0 : if (p->read.packet)
315 : 0 : pa_packet_unref(p->read.packet);
316 : :
317 : 0 : pa_xfree(p);
318 : 0 : }
319 : :
320 : 0 : void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
321 : : struct item_info *i;
322 : :
323 [ # # ]: 0 : pa_assert(p);
324 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
325 [ # # ]: 0 : pa_assert(packet);
326 : :
327 [ # # ]: 0 : if (p->dead)
328 : 0 : return;
329 : :
330 [ # # ]: 0 : if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
331 : 0 : i = pa_xnew(struct item_info, 1);
332 : :
333 : 0 : i->type = PA_PSTREAM_ITEM_PACKET;
334 : 0 : i->packet = pa_packet_ref(packet);
335 : :
336 : : #ifdef HAVE_CREDS
337 [ # # ]: 0 : if ((i->with_creds = !!creds))
338 : 0 : i->creds = *creds;
339 : : #endif
340 : :
341 : 0 : pa_queue_push(p->send_queue, i);
342 : :
343 : 0 : p->mainloop->defer_enable(p->defer_event, 1);
344 : : }
345 : :
346 : 0 : void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
347 : : size_t length, idx;
348 : : size_t bsm;
349 : :
350 [ # # ]: 0 : pa_assert(p);
351 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
352 [ # # ]: 0 : pa_assert(channel != (uint32_t) -1);
353 [ # # ]: 0 : pa_assert(chunk);
354 : :
355 [ # # ]: 0 : if (p->dead)
356 : 0 : return;
357 : :
358 : 0 : idx = 0;
359 : 0 : length = chunk->length;
360 : :
361 : 0 : bsm = pa_mempool_block_size_max(p->mempool);
362 : :
363 [ # # ]: 0 : while (length > 0) {
364 : : struct item_info *i;
365 : : size_t n;
366 : :
367 [ # # ]: 0 : if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
368 : 0 : i = pa_xnew(struct item_info, 1);
369 : 0 : i->type = PA_PSTREAM_ITEM_MEMBLOCK;
370 : :
371 : 0 : n = PA_MIN(length, bsm);
372 : 0 : i->chunk.index = chunk->index + idx;
373 : 0 : i->chunk.length = n;
374 : 0 : i->chunk.memblock = pa_memblock_ref(chunk->memblock);
375 : :
376 : 0 : i->channel = channel;
377 : 0 : i->offset = offset;
378 : 0 : i->seek_mode = seek_mode;
379 : : #ifdef HAVE_CREDS
380 : 0 : i->with_creds = FALSE;
381 : : #endif
382 : :
383 : 0 : pa_queue_push(p->send_queue, i);
384 : :
385 : 0 : idx += n;
386 : 0 : length -= n;
387 : : }
388 : :
389 : 0 : p->mainloop->defer_enable(p->defer_event, 1);
390 : : }
391 : :
392 : 0 : void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
393 : : struct item_info *item;
394 [ # # ]: 0 : pa_assert(p);
395 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
396 : :
397 [ # # ]: 0 : if (p->dead)
398 : 0 : return;
399 : :
400 : : /* pa_log("Releasing block %u", block_id); */
401 : :
402 [ # # ]: 0 : if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
403 : 0 : item = pa_xnew(struct item_info, 1);
404 : 0 : item->type = PA_PSTREAM_ITEM_SHMRELEASE;
405 : 0 : item->block_id = block_id;
406 : : #ifdef HAVE_CREDS
407 : 0 : item->with_creds = FALSE;
408 : : #endif
409 : :
410 : 0 : pa_queue_push(p->send_queue, item);
411 : 0 : p->mainloop->defer_enable(p->defer_event, 1);
412 : : }
413 : :
414 : : /* might be called from thread context */
415 : 0 : static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
416 : 0 : pa_pstream *p = userdata;
417 : :
418 [ # # ]: 0 : pa_assert(p);
419 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
420 : :
421 [ # # ]: 0 : if (p->dead)
422 : 0 : return;
423 : :
424 [ # # ]: 0 : if (p->release_callback)
425 : 0 : p->release_callback(p, block_id, p->release_callback_userdata);
426 : : else
427 : 0 : pa_pstream_send_release(p, block_id);
428 : : }
429 : :
430 : 0 : void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
431 : : struct item_info *item;
432 [ # # ]: 0 : pa_assert(p);
433 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
434 : :
435 [ # # ]: 0 : if (p->dead)
436 : 0 : return;
437 : : /* pa_log("Revoking block %u", block_id); */
438 : :
439 [ # # ]: 0 : if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
440 : 0 : item = pa_xnew(struct item_info, 1);
441 : 0 : item->type = PA_PSTREAM_ITEM_SHMREVOKE;
442 : 0 : item->block_id = block_id;
443 : : #ifdef HAVE_CREDS
444 : 0 : item->with_creds = FALSE;
445 : : #endif
446 : :
447 : 0 : pa_queue_push(p->send_queue, item);
448 : 0 : p->mainloop->defer_enable(p->defer_event, 1);
449 : : }
450 : :
451 : : /* might be called from thread context */
452 : 0 : static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
453 : 0 : pa_pstream *p = userdata;
454 : :
455 [ # # ]: 0 : pa_assert(p);
456 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
457 : :
458 [ # # ]: 0 : if (p->revoke_callback)
459 : 0 : p->revoke_callback(p, block_id, p->revoke_callback_userdata);
460 : : else
461 : 0 : pa_pstream_send_revoke(p, block_id);
462 : 0 : }
463 : :
464 : 0 : static void prepare_next_write_item(pa_pstream *p) {
465 [ # # ]: 0 : pa_assert(p);
466 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
467 : :
468 : 0 : p->write.current = pa_queue_pop(p->send_queue);
469 : :
470 [ # # ]: 0 : if (!p->write.current)
471 : 0 : return;
472 : :
473 : 0 : p->write.index = 0;
474 : 0 : p->write.data = NULL;
475 : 0 : pa_memchunk_reset(&p->write.memchunk);
476 : :
477 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
478 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
479 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
480 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
481 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
482 : :
483 [ # # ]: 0 : if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
484 : :
485 [ # # ]: 0 : pa_assert(p->write.current->packet);
486 : 0 : p->write.data = p->write.current->packet->data;
487 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
488 : :
489 [ # # ]: 0 : } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
490 : :
491 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
492 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
493 : :
494 [ # # ]: 0 : } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
495 : :
496 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
497 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
498 : :
499 : : } else {
500 : : uint32_t flags;
501 : 0 : pa_bool_t send_payload = TRUE;
502 : :
503 [ # # ]: 0 : pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
504 [ # # ]: 0 : pa_assert(p->write.current->chunk.memblock);
505 : :
506 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
507 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
508 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
509 : :
510 : 0 : flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
511 : :
512 [ # # ]: 0 : if (p->use_shm) {
513 : : uint32_t block_id, shm_id;
514 : : size_t offset, length;
515 : :
516 [ # # ]: 0 : pa_assert(p->export);
517 : :
518 [ # # ]: 0 : if (pa_memexport_put(p->export,
519 : 0 : p->write.current->chunk.memblock,
520 : : &block_id,
521 : : &shm_id,
522 : : &offset,
523 : : &length) >= 0) {
524 : :
525 : 0 : flags |= PA_FLAG_SHMDATA;
526 : 0 : send_payload = FALSE;
527 : :
528 [ # # ]: 0 : p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
529 [ # # ]: 0 : p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
530 [ # # ]: 0 : p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
531 [ # # ]: 0 : p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
532 : :
533 : 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
534 : 0 : p->write.data = p->write.shm_info;
535 : : }
536 : : /* else */
537 : : /* pa_log_warn("Failed to export memory block."); */
538 : : }
539 : :
540 [ # # ]: 0 : if (send_payload) {
541 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
542 : 0 : p->write.memchunk = p->write.current->chunk;
543 : 0 : pa_memblock_ref(p->write.memchunk.memblock);
544 : 0 : p->write.data = NULL;
545 : : }
546 : :
547 [ # # ]: 0 : p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
548 : : }
549 : :
550 : : #ifdef HAVE_CREDS
551 [ # # ]: 0 : if ((p->send_creds_now = p->write.current->with_creds))
552 : 0 : p->write_creds = p->write.current->creds;
553 : : #endif
554 : : }
555 : :
556 : 0 : static int do_write(pa_pstream *p) {
557 : : void *d;
558 : : size_t l;
559 : : ssize_t r;
560 : 0 : pa_memblock *release_memblock = NULL;
561 : :
562 [ # # ]: 0 : pa_assert(p);
563 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
564 : :
565 [ # # ]: 0 : if (!p->write.current)
566 : 0 : prepare_next_write_item(p);
567 : :
568 [ # # ]: 0 : if (!p->write.current)
569 : : return 0;
570 : :
571 [ # # ]: 0 : if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
572 : 0 : d = (uint8_t*) p->write.descriptor + p->write.index;
573 : 0 : l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
574 : : } else {
575 [ # # ][ # # ]: 0 : pa_assert(p->write.data || p->write.memchunk.memblock);
576 : :
577 [ # # ]: 0 : if (p->write.data)
578 : 0 : d = p->write.data;
579 : : else {
580 : 0 : d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
581 : 0 : release_memblock = p->write.memchunk.memblock;
582 : : }
583 : :
584 : 0 : d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
585 [ # # ]: 0 : l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
586 : : }
587 : :
588 [ # # ]: 0 : pa_assert(l > 0);
589 : :
590 : : #ifdef HAVE_CREDS
591 [ # # ]: 0 : if (p->send_creds_now) {
592 : :
593 [ # # ]: 0 : if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
594 : : goto fail;
595 : :
596 : 0 : p->send_creds_now = FALSE;
597 : : } else
598 : : #endif
599 : :
600 [ # # ]: 0 : if ((r = pa_iochannel_write(p->io, d, l)) < 0)
601 : : goto fail;
602 : :
603 [ # # ]: 0 : if (release_memblock)
604 : 0 : pa_memblock_release(release_memblock);
605 : :
606 : 0 : p->write.index += (size_t) r;
607 : :
608 [ # # ][ # # ]: 0 : if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
609 [ # # ]: 0 : pa_assert(p->write.current);
610 : 0 : item_free(p->write.current);
611 : 0 : p->write.current = NULL;
612 : :
613 [ # # ]: 0 : if (p->write.memchunk.memblock)
614 : 0 : pa_memblock_unref(p->write.memchunk.memblock);
615 : :
616 : 0 : pa_memchunk_reset(&p->write.memchunk);
617 : :
618 [ # # ][ # # ]: 0 : if (p->drain_callback && !pa_pstream_is_pending(p))
619 : 0 : p->drain_callback(p, p->drain_callback_userdata);
620 : : }
621 : :
622 : : return 0;
623 : :
624 : : fail:
625 : :
626 [ # # ]: 0 : if (release_memblock)
627 : 0 : pa_memblock_release(release_memblock);
628 : :
629 : : return -1;
630 : : }
631 : :
632 : 0 : static int do_read(pa_pstream *p) {
633 : : void *d;
634 : : size_t l;
635 : : ssize_t r;
636 : 0 : pa_memblock *release_memblock = NULL;
637 [ # # ]: 0 : pa_assert(p);
638 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
639 : :
640 [ # # ]: 0 : if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
641 : 0 : d = (uint8_t*) p->read.descriptor + p->read.index;
642 : 0 : l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
643 : : } else {
644 [ # # ][ # # ]: 0 : pa_assert(p->read.data || p->read.memblock);
645 : :
646 [ # # ]: 0 : if (p->read.data)
647 : 0 : d = p->read.data;
648 : : else {
649 : 0 : d = pa_memblock_acquire(p->read.memblock);
650 : 0 : release_memblock = p->read.memblock;
651 : : }
652 : :
653 : 0 : d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
654 [ # # ]: 0 : l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
655 : : }
656 : :
657 : : #ifdef HAVE_CREDS
658 : : {
659 : 0 : pa_bool_t b = 0;
660 : :
661 [ # # ]: 0 : if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
662 : : goto fail;
663 : :
664 [ # # ][ # # ]: 0 : p->read_creds_valid = p->read_creds_valid || b;
665 : : }
666 : : #else
667 : : if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
668 : : goto fail;
669 : : #endif
670 : :
671 [ # # ]: 0 : if (release_memblock)
672 : 0 : pa_memblock_release(release_memblock);
673 : :
674 : 0 : p->read.index += (size_t) r;
675 : :
676 [ # # ]: 0 : if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
677 : : uint32_t flags, length, channel;
678 : : /* Reading of frame descriptor complete */
679 : :
680 [ # # ]: 0 : flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
681 : :
682 [ # # ][ # # ]: 0 : if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
683 : 0 : pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
684 : 0 : return -1;
685 : : }
686 : :
687 [ # # ]: 0 : if (flags == PA_FLAG_SHMRELEASE) {
688 : :
689 : : /* This is a SHM memblock release frame with no payload */
690 : :
691 : : /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
692 : :
693 [ # # ]: 0 : pa_assert(p->export);
694 [ # # ]: 0 : pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
695 : :
696 : 0 : goto frame_done;
697 : :
698 [ # # ]: 0 : } else if (flags == PA_FLAG_SHMREVOKE) {
699 : :
700 : : /* This is a SHM memblock revoke frame with no payload */
701 : :
702 : : /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
703 : :
704 [ # # ]: 0 : pa_assert(p->import);
705 [ # # ]: 0 : pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
706 : :
707 : 0 : goto frame_done;
708 : : }
709 : :
710 [ # # ]: 0 : length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
711 : :
712 [ # # ]: 0 : if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
713 : 0 : pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
714 : 0 : return -1;
715 : : }
716 : :
717 [ # # ][ # # ]: 0 : pa_assert(!p->read.packet && !p->read.memblock);
718 : :
719 [ # # ]: 0 : channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
720 : :
721 [ # # ]: 0 : if (channel == (uint32_t) -1) {
722 : :
723 [ # # ]: 0 : if (flags != 0) {
724 : 0 : pa_log_warn("Received packet frame with invalid flags value.");
725 : 0 : return -1;
726 : : }
727 : :
728 : : /* Frame is a packet frame */
729 : 0 : p->read.packet = pa_packet_new(length);
730 : 0 : p->read.data = p->read.packet->data;
731 : :
732 : : } else {
733 : :
734 [ # # ]: 0 : if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
735 : 0 : pa_log_warn("Received memblock frame with invalid seek mode.");
736 : 0 : return -1;
737 : : }
738 : :
739 [ # # ]: 0 : if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
740 : :
741 [ # # ]: 0 : if (length != sizeof(p->read.shm_info)) {
742 : 0 : pa_log_warn("Received SHM memblock frame with Invalid frame length.");
743 : 0 : return -1;
744 : : }
745 : :
746 : : /* Frame is a memblock frame referencing an SHM memblock */
747 : 0 : p->read.data = p->read.shm_info;
748 : :
749 [ # # ]: 0 : } else if ((flags & PA_FLAG_SHMMASK) == 0) {
750 : :
751 : : /* Frame is a memblock frame */
752 : :
753 : 0 : p->read.memblock = pa_memblock_new(p->mempool, length);
754 : 0 : p->read.data = NULL;
755 : : } else {
756 : :
757 : 0 : pa_log_warn("Received memblock frame with invalid flags value.");
758 : 0 : return -1;
759 : : }
760 : : }
761 : :
762 [ # # ]: 0 : } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
763 : : /* Frame payload available */
764 : :
765 [ # # ][ # # ]: 0 : if (p->read.memblock && p->receive_memblock_callback) {
766 : :
767 : : /* Is this memblock data? Than pass it to the user */
768 [ # # ]: 0 : l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
769 : :
770 [ # # ]: 0 : if (l > 0) {
771 : : pa_memchunk chunk;
772 : :
773 : 0 : chunk.memblock = p->read.memblock;
774 : 0 : chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
775 : 0 : chunk.length = l;
776 : :
777 [ # # ]: 0 : if (p->receive_memblock_callback) {
778 : : int64_t offset;
779 : :
780 : 0 : offset = (int64_t) (
781 [ # # ]: 0 : (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
782 [ # # ]: 0 : (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
783 : :
784 : 0 : p->receive_memblock_callback(
785 : : p,
786 [ # # ]: 0 : ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
787 : : offset,
788 [ # # ]: 0 : ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
789 : : &chunk,
790 : : p->receive_memblock_callback_userdata);
791 : : }
792 : :
793 : : /* Drop seek info for following callbacks */
794 : 0 : p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
795 : 0 : p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
796 : 0 : p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
797 : : }
798 : : }
799 : :
800 : : /* Frame complete */
801 [ # # ][ # # ]: 0 : if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
802 : :
803 [ # # ]: 0 : if (p->read.memblock) {
804 : :
805 : : /* This was a memblock frame. We can unref the memblock now */
806 : 0 : pa_memblock_unref(p->read.memblock);
807 : :
808 [ # # ]: 0 : } else if (p->read.packet) {
809 : :
810 [ # # ]: 0 : if (p->receive_packet_callback)
811 : : #ifdef HAVE_CREDS
812 [ # # ]: 0 : p->receive_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->receive_packet_callback_userdata);
813 : : #else
814 : : p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
815 : : #endif
816 : :
817 : 0 : pa_packet_unref(p->read.packet);
818 : : } else {
819 : : pa_memblock *b;
820 : :
821 [ # # ][ # # ]: 0 : pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
822 : :
823 [ # # ]: 0 : pa_assert(p->import);
824 : :
825 [ # # ]: 0 : if (!(b = pa_memimport_get(p->import,
826 [ # # ]: 0 : ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
827 [ # # ]: 0 : ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
828 [ # # ]: 0 : ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
829 [ # # ]: 0 : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
830 : :
831 [ # # ]: 0 : if (pa_log_ratelimit(PA_LOG_DEBUG))
832 : 0 : pa_log_debug("Failed to import memory block.");
833 : : }
834 : :
835 [ # # ]: 0 : if (p->receive_memblock_callback) {
836 : : int64_t offset;
837 : : pa_memchunk chunk;
838 : :
839 : 0 : chunk.memblock = b;
840 : 0 : chunk.index = 0;
841 [ # # ][ # # ]: 0 : chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
842 : :
843 : 0 : offset = (int64_t) (
844 [ # # ]: 0 : (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
845 [ # # ]: 0 : (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
846 : :
847 : 0 : p->receive_memblock_callback(
848 : : p,
849 [ # # ]: 0 : ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
850 : : offset,
851 [ # # ]: 0 : ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
852 : : &chunk,
853 : : p->receive_memblock_callback_userdata);
854 : : }
855 : :
856 [ # # ]: 0 : if (b)
857 : 0 : pa_memblock_unref(b);
858 : : }
859 : :
860 : : goto frame_done;
861 : : }
862 : : }
863 : :
864 : : return 0;
865 : :
866 : : frame_done:
867 : 0 : p->read.memblock = NULL;
868 : 0 : p->read.packet = NULL;
869 : 0 : p->read.index = 0;
870 : 0 : p->read.data = NULL;
871 : :
872 : : #ifdef HAVE_CREDS
873 : 0 : p->read_creds_valid = FALSE;
874 : : #endif
875 : :
876 : 0 : return 0;
877 : :
878 : : fail:
879 [ # # ]: 0 : if (release_memblock)
880 : 0 : pa_memblock_release(release_memblock);
881 : :
882 : : return -1;
883 : : }
884 : :
885 : 0 : void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
886 [ # # ]: 0 : pa_assert(p);
887 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
888 : :
889 : 0 : p->die_callback = cb;
890 : 0 : p->die_callback_userdata = userdata;
891 : 0 : }
892 : :
893 : 0 : void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
894 [ # # ]: 0 : pa_assert(p);
895 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
896 : :
897 : 0 : p->drain_callback = cb;
898 : 0 : p->drain_callback_userdata = userdata;
899 : 0 : }
900 : :
901 : 0 : void pa_pstream_set_receive_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
902 [ # # ]: 0 : pa_assert(p);
903 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
904 : :
905 : 0 : p->receive_packet_callback = cb;
906 : 0 : p->receive_packet_callback_userdata = userdata;
907 : 0 : }
908 : :
909 : 0 : void pa_pstream_set_receive_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
910 [ # # ]: 0 : pa_assert(p);
911 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
912 : :
913 : 0 : p->receive_memblock_callback = cb;
914 : 0 : p->receive_memblock_callback_userdata = userdata;
915 : 0 : }
916 : :
917 : 0 : void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
918 [ # # ]: 0 : pa_assert(p);
919 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
920 : :
921 : 0 : p->release_callback = cb;
922 : 0 : p->release_callback_userdata = userdata;
923 : 0 : }
924 : :
925 : 0 : void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
926 [ # # ]: 0 : pa_assert(p);
927 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
928 : :
929 : 0 : p->release_callback = cb;
930 : 0 : p->release_callback_userdata = userdata;
931 : 0 : }
932 : :
933 : 0 : pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
934 : : pa_bool_t b;
935 : :
936 [ # # ]: 0 : pa_assert(p);
937 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
938 : :
939 [ # # ]: 0 : if (p->dead)
940 : : b = FALSE;
941 : : else
942 [ # # ][ # # ]: 0 : b = p->write.current || !pa_queue_isempty(p->send_queue);
943 : :
944 : 0 : return b;
945 : : }
946 : :
947 : 0 : void pa_pstream_unref(pa_pstream*p) {
948 [ # # ]: 0 : pa_assert(p);
949 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
950 : :
951 [ # # ]: 0 : if (PA_REFCNT_DEC(p) <= 0)
952 : 0 : pstream_free(p);
953 : 0 : }
954 : :
955 : 0 : pa_pstream* pa_pstream_ref(pa_pstream*p) {
956 [ # # ]: 0 : pa_assert(p);
957 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
958 : :
959 : 0 : PA_REFCNT_INC(p);
960 : 0 : return p;
961 : : }
962 : :
963 : 0 : void pa_pstream_unlink(pa_pstream *p) {
964 [ # # ]: 0 : pa_assert(p);
965 : :
966 [ # # ]: 0 : if (p->dead)
967 : 0 : return;
968 : :
969 : 0 : p->dead = TRUE;
970 : :
971 [ # # ]: 0 : if (p->import) {
972 : 0 : pa_memimport_free(p->import);
973 : 0 : p->import = NULL;
974 : : }
975 : :
976 [ # # ]: 0 : if (p->export) {
977 : 0 : pa_memexport_free(p->export);
978 : 0 : p->export = NULL;
979 : : }
980 : :
981 [ # # ]: 0 : if (p->io) {
982 : 0 : pa_iochannel_free(p->io);
983 : 0 : p->io = NULL;
984 : : }
985 : :
986 [ # # ]: 0 : if (p->defer_event) {
987 : 0 : p->mainloop->defer_free(p->defer_event);
988 : 0 : p->defer_event = NULL;
989 : : }
990 : :
991 : 0 : p->die_callback = NULL;
992 : 0 : p->drain_callback = NULL;
993 : 0 : p->receive_packet_callback = NULL;
994 : 0 : p->receive_memblock_callback = NULL;
995 : : }
996 : :
997 : 0 : void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
998 [ # # ]: 0 : pa_assert(p);
999 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
1000 : :
1001 : 0 : p->use_shm = enable;
1002 : :
1003 [ # # ]: 0 : if (enable) {
1004 : :
1005 [ # # ]: 0 : if (!p->export)
1006 : 0 : p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1007 : :
1008 : : } else {
1009 : :
1010 [ # # ]: 0 : if (p->export) {
1011 : 0 : pa_memexport_free(p->export);
1012 : 0 : p->export = NULL;
1013 : : }
1014 : : }
1015 : 0 : }
1016 : :
1017 : 0 : pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1018 [ # # ]: 0 : pa_assert(p);
1019 [ # # ]: 0 : pa_assert(PA_REFCNT_VALUE(p) > 0);
1020 : :
1021 : 0 : return p->use_shm;
1022 : : }
|