What are pstreams?
Whenever a client or a server wants to send a message to the other end, it actually first builds the message using PA tagstruct mechanism. After the tagstruct gets built, it actually sends it using a pstream.
Here's an example from our Protocol Implementation section where a server sends an ACK to a clilent request using the REPLY
opcode:
void pa_pstream_send_simple_ack(pa_pstream *p, uint32_t tag) {
pa_tagstruct *t;
pa_assert_se(t = pa_tagstruct_new());
pa_tagstruct_putu32(t, PA_COMMAND_REPLY);
pa_tagstruct_putu32(t, tag);
pa_pstream_send_tagstruct(p, t);
}
And this is the core communication mechanism between clients and servers.
Another example is setting the default sink, from src/pulse/context.c
:
pa_operation* pa_context_set_default_sink(pa_context *c, const char *name, pa_context_success_cb_t cb, void *userdata) {
pa_tagstruct *t;
pa_operation *o;
uint32_t tag;
...
t = pa_tagstruct_command(c, PA_COMMAND_SET_DEFAULT_SINK, &tag);
pa_tagstruct_puts(t, name);
pa_pstream_send_tagstruct(c->pstream, t);
pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, pa_context_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
return o;
}
And so on.
What is behind a Pstream?
Behind a pstream is the raw Unix-socket/pipe connection.
What can be sent over a pstream?
All types of structures can be sent over the pipe, including:
enum {
PA_PSTREAM_ITEM_PACKET,
PA_PSTREAM_ITEM_MEMBLOCK,
PA_PSTREAM_ITEM_SHMRELEASE,
PA_PSTREAM_ITEM_SHMREVOKE
} type;
Sending packets (ITEM_PACKET
) is discussed here in this document, since this is how "tagstructs" (small commands, and their parameter) are sent to the other end.
The sudocument "Details of Pstreams Shared Membory Blocks Transfer" details how memblocks transfer (ITEM_MEMBLOCK
) occur over the pipe, in detail. Both private blocks and posix SHM blocks transfers is covered.
Sending a tagstruct
#define pa_pstream_send_tagstruct(p, t) \
pa_pstream_send_tagstruct_with_creds((p), (t), NULL)
void pa_pstream_send_tagstruct_with_creds(pa_pstream *p, pa_tagstruct *t, const pa_creds *creds) {
if (creds) {
pa_cmsg_ancil_data a;
a.nfd = 0;
a.creds_valid = true;
a.creds = *creds;
pa_pstream_send_tagstruct_with_ancil_data(p, t, &a);
}
else
pa_pstream_send_tagstruct_with_ancil_data(p, t, NULL);
}
static void pa_pstream_send_tagstruct_with_ancil_data(pa_pstream *p, pa_tagstruct *t, const pa_cmsg_ancil_data *ancil_data) {
size_t length;
const uint8_t *data;
pa_packet *packet;
pa_assert(p);
pa_assert(t);
pa_assert_se(data = pa_tagstruct_data(t, &length));
pa_assert_se(packet = pa_packet_new_data(data, length));
pa_tagstruct_free(t);
pa_pstream_send_packet(p, packet, ancil_data);
pa_packet_unref(packet);
}
Note that in the above, send_tagstruct extracts all the pure data from the tag struct and creates a packet out of it.
Sending packets
As seen above, the tag struct data gets fully extracted and a pa_packet
is created. The packet is sent using pa_pstream_send_packet()
as in below:
void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data) {
struct item_info *i;
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
pa_assert(packet);
if (p->dead)
return;
if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_PACKET;
i->packet = pa_packet_ref(packet);
#ifdef HAVE_CREDS
if ((i->with_ancil_data = !!ancil_data)) {
i->ancil_data = *ancil_data;
if (ancil_data->creds_valid)
pa_assert(ancil_data->nfd == 0);
else
pa_assert(ancil_data->nfd > 0);
}
#endif
pa_queue_push(p->send_queue, i);
p->mainloop->defer_enable(p->defer_event, 1);
}
Note that this just pushes the packet into the pstream send queue. PA is asynchronous by nature, and we just infom the event loop about new data. The mainloop does its work in its own context later.
How the packets are actually sent?
So, we've transformed tagstruct into raw data, packed the data into a packet, pushed the packet into the pstream send queue, and notified the mainloop about our changes.
But where is the place where the data is actually sent? To discover this, we'll use a top-down approach.
In the last section, we've noticed that the data was just pushed to the send queue. So a starting point is, where is this send queue popped? We can see that it's popped in:
static void prepare_next_write_item(pa_pstream *p) {
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
p->write.current = pa_queue_pop(p->send_queue);
...
}
static int do_write(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
pa_memblock *release_memblock = NULL;
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
if (!p->write.current)
prepare_next_write_item(p);
if (!p->write.current) {
/* The out queue is empty, so switching channels is safe */
check_srbpending(p);
return 0;
}
...
}
static void do_pstream_read_write(pa_pstream *p) {
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
pa_pstream_ref(p);
p->mainloop->defer_enable(p->defer_event, 0);
if (!p->dead && p->srb) {
do_write(p);
while (!p->dead && do_read(p, &p->readsrb) == 0);
}
...
}
static bool srb_callback(pa_srbchannel *srb, void *userdata) {
pa_pstream *p = userdata;
...
do_pstream_read_write(p);
return p->srb != NULL;
}
static void io_callback(pa_iochannel*io, void *userdata) {
pa_pstream *p = userdata;
...
do_pstream_read_write(p);
}
static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
pa_pstream *p = userdata;
...
do_pstream_read_write(p);
}
So as we can see from above, every thing originates from the defer_callback
which gets triggered by the mainloop when a defer event occurs. The callback goes through different steps up to emptying the send qeue and sending the actual packets in do_write()
.
Finally! Writing!
As stated above, at the end, it boils down to do_write()
. This method actually just calculates d
and `l' (pointer to data and its length), then invoke actual writing:
static int do_write(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
pa_memblock *release_memblock = NULL;
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
if (!p->write.current)
prepare_next_write_item(p);
if (!p->write.current) {
/* The out queue is empty, so switching channels is safe */
check_srbpending(p);
return 0;
}
/* ... Calculate `d' and `l' ... */
pa_assert(l > 0);
if (p->srb)
r = pa_srbchannel_write(p->srb, d, l);
else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
goto fail;
iochannel then abstracts classic Unix socket read/write mechansims through a PulseAudio event loop.
References
- PulseAudio buffers and protocols, David Henningsson, Canonical blog, 2014