From Pstreams to Srbchannel

Srbchannel stands for a "Shared RingBuffer Channel". This work was added by David Henningsson from Canonical to minimize PA latency for VoIP and gaming clients. Unlike music or movie plaing, such clients typically issue a lot of requests with small amount of data within each request. Thus, the communication cost between PA clients and the daemon, even for commands and not for the data itself, was quite high.

As we've seen from the previous article, What are Pstreams?, Pstreams are used from within the PA code base for sending commands from client to server, or from server to client. The underlying mechanism which pstreams use for writing the actual data are either iochannel or srbchannel mechanisms.

The iochannel mechanism is the basic event loop mechanism using read(), write(), and poll() system calls over classical Unix sockets. This is the cost which srbchannel tries to mitigate, and which we will discuss in detail below.

From where are srbchannels created?

As discussed in our Protocol Implementation page, when a PA client connects to the daemon, it does so by initiating a connection with one of the sockets provided by the daemon. In the regular case, this is the unix socket residing at /run/user/$UID/pulse/native.

Since all of PA code is asynchronous and non-blocking, the connection is not done directly. Rather, it's done using PulseAudio asynchronous event loop mechanisms. Upon a successful connection, the on_connection callback, residing at src/pulse/context.c is called:

static void on_connection(pa_socket_client *client, pa_iochannel*io, void *userdata) {
    pa_context *c = userdata;
    ...
    setup_context(c, io);

finish:
    pa_context_unref(c);
}

static void setup_context(pa_context *c, pa_iochannel *io) {
    uint8_t cookie[PA_NATIVE_COOKIE_LENGTH];
    pa_tagstruct *t;
    uint32_t tag;

    c->pstream = pa_pstream_new(c->mainloop, io, c->mempool);
    ...
    c->pdispatch = pa_pdispatch_new(c->mainloop, c->use_rtclock, command_table, PA_COMMAND_MAX);
    ...

    c->do_shm =
        pa_mempool_is_shared(c->mempool) &&
        c->is_local;

    pa_log_debug("SHM possible: %s", pa_yes_no(c->do_shm));

    t = pa_tagstruct_command(c, PA_COMMAND_AUTH, &tag);
    pa_tagstruct_putu32(t, PA_PROTOCOL_VERSION | (c->do_shm ? 0x80000000U : 0));
    pa_tagstruct_put_arbitrary(t, cookie, sizeof(cookie));
    pa_pstream_send_tagstruct(c->pstream, t);

As we can see from above, upon a successful connection, the client builds the connection context pstream and commands dispatch table. Afterwards, and most importantly, it sends PA_COMMAND_AUTH command.

As discussed in the What are Pstreams? article, commands are sent by building a tagstruct of values and sending this tagstruct using the pstream. In this case, the tagstruct represents the AUTH command, which also includes the protocol version and a cookie. Afterwards, such command is sent over our connection context pstream.

Creating an srb: Server response to the AUTH command

The way both clients and servers responds to commands are building a dispatch table containing all the expected known commands and their handlers.

This is done on the server side at src/pulsecore/protocol-native.c, where all the handlers for the commands a PulseAudio daemon expects are registered:

static const pa_pdispatch_cb_t command_table[PA_COMMAND_MAX] = {
    [PA_COMMAND_ERROR] = NULL,
    [PA_COMMAND_TIMEOUT] = NULL,
    [PA_COMMAND_REPLY] = NULL,
    [PA_COMMAND_CREATE_PLAYBACK_STREAM] = command_create_playback_stream,
    [PA_COMMAND_DELETE_PLAYBACK_STREAM] = command_delete_stream,
    ...
    [PA_COMMAND_AUTH] = command_auth,
    ...

Related to the topic of this article is the server's response to the AUTH command. A response which creates the srb channel in the process!

static void command_auth(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
    pa_native_connection *c = PA_NATIVE_CONNECTION(userdata);
    const void*cookie;
    pa_tagstruct *reply;
    bool shm_on_remote = false, do_shm;

    pa_native_connection_assert_ref(c);
    pa_assert(t);

    if (pa_tagstruct_getu32(t, &c->version) < 0 ||
        pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
        !pa_tagstruct_eof(t)) {
        protocol_error(c);
        return;
    }

    ...

    do_shm =
        pa_mempool_is_shared(c->protocol->core->mempool) &&
        c->is_local;

    pa_log_debug("SHM possible: %s", pa_yes_no(do_shm));

    ...

    setup_srbchannel(c);

As we can see from above, the server expects from an AUTH command to have the protocol version and a cookie attached. Then it also makes its own checks (like the client) to make sure that this connection can safely use POSIX shared memory mechanisms. In the end we get our prize, setting up the srbchannel ;-)

Srbchannel setup

Setting up an srb channel is done by the server, and resides at src/pulsecore/protocol-native.c. This is where all of the necessary srbchannel stuff gets started:

static void setup_srbchannel(pa_native_connection *c) {
    pa_srbchannel_template srbt;
    pa_srbchannel *srb;
    pa_memchunk mc;
    pa_tagstruct *t;
    int fdlist[2];

    if (!c->options->srbchannel) {
        pa_log_debug("Disabling srbchannel, reason: Must be enabled by module parameter");
        return;
    }

    if (c->version < 30) {
        pa_log_debug("Disabling srbchannel, reason: Protocol too old");
        return;
    }

    if (!pa_pstream_get_shm(c->pstream)) {
        pa_log_debug("Disabling srbchannel, reason: No SHM support");
        return;
    }

    if (!c->protocol->core->rw_mempool) {
        pa_log_debug("Disabling srbchannel, reason: No rw memory pool");
        return;
    }

    srb = pa_srbchannel_new(c->protocol->core->mainloop, c->protocol->core->rw_mempool);
    if (!srb) {
        pa_log_debug("Failed to create srbchannel");
        return;
    }
    pa_log_debug("Enabling srbchannel...");
    pa_srbchannel_export(srb, &srbt);

    /* Send enable command to client */
    t = pa_tagstruct_new();
    pa_tagstruct_putu32(t, PA_COMMAND_ENABLE_SRBCHANNEL);
    pa_tagstruct_putu32(t, (size_t) srb); /* tag */
    fdlist[0] = srbt.readfd;
    fdlist[1] = srbt.writefd;
    pa_pstream_send_tagstruct_with_fds(c->pstream, t, 2, fdlist);

    /* Send ringbuffer memblock to client */
    mc.memblock = srbt.memblock;
    mc.index = 0;
    mc.length = pa_memblock_get_length(srbt.memblock);
    pa_pstream_send_memblock(c->pstream, 0, 0, 0, &mc);

    c->srbpending = srb;
}

As you can see from above, the server first checks if this connection has the necessary facilities for an srb: a new protocol version, SHM support, and a read/write memory pool. If all of these features exist in this connection, an srb channel is created.

If the srb channel was created successfully, the server sends to the client an ENABLE_SRBCHANNEL command. Then there are very important fields attached with this command, and are as follows:

  • srbchannel read and write file descriptors: These are used as an optimized event mechanism using Linux eventfds
  • memory block. The SHM ringbuffer area used for reading and writing commands

Actual srbchannel writing

Now that the mechanism has been detailed, it would be quite instructive to see how data is written into an srbchannel. The protocol works as follows:

We always listen to sem_read, and always signal on sem_write.

As we've seen in What are Pstreams? article, at the end of operations started by pa_pstream_send_tagstruct, pa_srbchannel_write() is used. It takes the srb channel as its first parameter, and the data and its length as the second and third one. Let's see how this works:

size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
    size_t written = 0;

    while (l > 0) {
        int towrite;
        void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);

        if ((size_t) towrite > l)
            towrite = l;

        if (towrite == 0) {
            pa_log("srbchannel output buffer full");
            break;
        }

        memcpy(ptr, data, towrite);
        pa_ringbuffer_end_write(&sr->rb_write, towrite);
        written += towrite;
        data = (uint8_t*) data + towrite;
        l -= towrite;
    }
    pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);

    pa_fdsem_post(sr->sem_write);
    return written;
}

The first section of the function actually just fills the ringbuffer with data until finishing all the data that needs to be written. Afterwards, the write event mechanism pa_fdsem_post is invoked.

srbchannel write event mechanism

As we've seen from the pervious section, after writing data to the srbchannel ringbuffer, the other ends get notified about such operation using the following function:

void pa_fdsem_post(pa_fdsem *f) {
    pa_assert(f);

    if (pa_atomic_cmpxchg(&f->data->signalled, 0, 1)) {

        if (pa_atomic_load(&f->data->waiting)) {
            ssize_t r;
            char x = 'x';

            pa_atomic_inc(&f->data->in_pipe);

            for (;;) {

#ifdef HAVE_SYS_EVENTFD_H
                if (f->efd >= 0) {
                    uint64_t u = 1;

                    if ((r = pa_write(f->efd, &u, sizeof(u), &f->write_type)) != sizeof(u)) {
                        if (r >= 0 || errno != EINTR) {
                            pa_log_error("Invalid write to eventfd: %s", r < 0 ? pa_cstrerror(errno) : "EOF");
                            pa_assert_not_reached();
                        }

                        continue;
                    }
                } else
#endif

                if ((r = pa_write(f->fds[1], &x, 1, &f->write_type)) != 1) {
                    if (r >= 0 || errno != EINTR) {
                        pa_log_error("Invalid write to pipe: %s", r < 0 ? pa_cstrerror(errno) : "EOF");
                        pa_assert_not_reached();
                    }

                    continue;
                }

                break;
            }
        }
    }
}

Note that eventfds are used by default on Linux systems. From the eventfd(2) manpage: "the kernel overhead of an eventfd file descriptor is much lower than that of a pipe, and only one file descriptor is required -- versus the two required for a pipe". If we're on an older Unix system, the typical pipes signalling mechanism is used.

References

results matching ""

    No results matching ""