Clone server, Model Five in C

// Clone server - Model Five

// Lets us build this source without creating a library
#include "kvmsg.c"

// zloop reactor handlers
static int s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args);
static int s_flush_ttl (zloop_t *loop, int timer_id, void *args);

// Our server is defined by these properties
typedef struct {
zctx_t *ctx; // Context wrapper
zhash_t *kvmap; // Key-value store
zloop_t *loop; // zloop reactor
int port; // Main port we're working on
int64_t sequence; // How many updates we're at
void *snapshot; // Handle snapshot requests
void *publisher; // Publish updates to clients
void *collector; // Collect updates from clients
} clonesrv_t;

int main (void)
{
clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
self->port = 5556;
self->ctx = zctx_new ();
self->kvmap = zhash_new ();
self->loop = zloop_new ();
zloop_set_verbose (self->loop, false);

// Set up our clone server sockets
self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER);
zsocket_bind (self->snapshot, "tcp://*:%d", self->port);
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
self->collector = zsocket_new (self->ctx, ZMQ_PULL);
zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);

// Register our handlers with reactor
zmq_pollitem_t poller = { 0, 0, ZMQ_POLLIN };
poller.socket = self->snapshot;
zloop_poller (self->loop, &poller, s_snapshots, self);
poller.socket = self->collector;
zloop_poller (self->loop, &poller, s_collector, self);
zloop_timer (self->loop, 1000, 0, s_flush_ttl, self);

// Run reactor until process interrupted
zloop_start (self->loop);

zloop_destroy (&self->loop);
zhash_destroy (&self->kvmap);
zctx_destroy (&self->ctx);
free (self);
return 0;
}

// We handle ICANHAZ? requests by sending snapshot data to the
// client that requested it:

// Routing information for a key-value snapshot
typedef struct {
void *socket; // ROUTER socket to send to
zframe_t *identity; // Identity of peer who requested state
char *subtree; // Client subtree specification
} kvroute_t;

// We call this function for each key-value pair in our hash table
static int
s_send_single (const char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
&& memcmp (kvroute->subtree,
kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
zframe_send (&kvroute->identity, // Choose recipient
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send (kvmsg, kvroute->socket);
}
return 0;
}

// This is the reactor handler for the snapshot socket; it accepts
// just the ICANHAZ? request and replies with a state snapshot ending
// with a KTHXBAI message:

static int
s_snapshots (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

zframe_t *identity = zframe_recv (poller->socket);
if (identity) {
// Request is in second frame of message
char *request = zstr_recv (poller->socket);
char *subtree = NULL;
if (streq (request, "ICANHAZ?")) {
free (request);
subtree = zstr_recv (poller->socket);
}
else
printf ("E: bad request, aborting\n");

if (subtree) {
// Send state socket to client
kvroute_t routing = { poller->socket, identity, subtree };
zhash_foreach (self->kvmap, s_send_single, &routing);

// Now send END message with sequence number
zclock_log ("I: sending shapshot=%d", (int) self->sequence);
zframe_send (&identity, poller->socket, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new (self->sequence);
kvmsg_set_key (kvmsg, "KTHXBAI");
kvmsg_set_body (kvmsg, (byte *) subtree, 0);
kvmsg_send (kvmsg, poller->socket);
kvmsg_destroy (&kvmsg);
free (subtree);
}
zframe_destroy(&identity);
}
return 0;
}

// We store each update with a new sequence number, and if necessary, a
// time-to-live. We publish updates immediately on our publisher socket:

static int
s_collector (zloop_t *loop, zmq_pollitem_t *poller, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = kvmsg_recv (poller->socket);
if (kvmsg) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_send (kvmsg, self->publisher);
int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop (kvmsg, "ttl",
"%" PRId64, zclock_time () + ttl * 1000);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing update=%d", (int) self->sequence);
}
return 0;
}

// At regular intervals, we flush ephemeral values that have expired. This
// could be slow on very large data sets:

// If key-value pair has expired, delete it and publish the
// fact to listening clients.

static int
s_flush_single (const char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;

kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time () >= ttl) {
kvmsg_set_sequence (kvmsg, ++self->sequence);
kvmsg_set_body (kvmsg, (byte *) "", 0);
kvmsg_send (kvmsg, self->publisher);
kvmsg_store (&kvmsg, self->kvmap);
zclock_log ("I: publishing delete=%d", (int) self->sequence);
}
return 0;
}

static int
s_flush_ttl (zloop_t *loop, int timer_id, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
if (self->kvmap)
zhash_foreach (self->kvmap, s_flush_single, args);
return 0;
}