File transfer test, model 2 in C

// File Transfer model #2
//
// In which the client requests each chunk individually, thus
// eliminating server queue overflows, but at a cost in speed.

#include <czmq.h>
#define CHUNK_SIZE 250000

static void
client_thread (void *args, zctx_t *ctx, void *pipe)
{
void *dealer = zsocket_new (ctx, ZMQ_DEALER);
zsocket_set_hwm (dealer, 1);
zsocket_connect (dealer, "tcp://127.0.0.1:6000");

size_t total = 0; // Total bytes received
size_t chunks = 0; // Total chunks received

while (true) {
// Ask for next chunk
zstr_sendm (dealer, "fetch");
zstr_sendfm (dealer, "%ld", total);
zstr_sendf (dealer, "%d", CHUNK_SIZE);

zframe_t *chunk = zframe_recv (dealer);
if (!chunk)
break; // Shutting down, quit
chunks++;
size_t size = zframe_size (chunk);
zframe_destroy (&chunk);
total += size;
if (size < CHUNK_SIZE)
break; // Last chunk received; exit
}
printf ("%zd chunks received, %zd bytes\n", chunks, total);
zstr_send (pipe, "OK");
}

// The server thread waits for a chunk request from a client,
// reads that chunk, and sends it back to the client:

static void
server_thread (void *args, zctx_t *ctx, void *pipe)
{
FILE *file = fopen ("testdata", "r");
assert (file);

void *router = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_hwm (router, 1);
zsocket_bind (router, "tcp://*:6000");
while (true) {
// First frame in each message is the sender identity
zframe_t *identity = zframe_recv (router);
if (!identity)
break; // Shutting down, quit

// Second frame is "fetch" command
char *command = zstr_recv (router);
assert (streq (command, "fetch"));
free (command);

// Third frame is chunk offset in file
char *offset_str = zstr_recv (router);
size_t offset = atoi (offset_str);
free (offset_str);

// Fourth frame is maximum chunk size
char *chunksz_str = zstr_recv (router);
size_t chunksz = atoi (chunksz_str);
free (chunksz_str);

// Read chunk of data from file
fseek (file, offset, SEEK_SET);
byte *data = malloc (chunksz);
assert (data);

// Send resulting chunk to client
size_t size = fread (data, 1, chunksz, file);
zframe_t *chunk = zframe_new (data, size);
zframe_send (&identity, router, ZFRAME_MORE);
zframe_send (&chunk, router, 0);
}
fclose (file);
}

// The main task is just the same as in the first model.

int main (void)
{
// Start child threads
zctx_t *ctx = zctx_new ();
zthread_fork (ctx, server_thread, NULL);
void *client =
zthread_fork (ctx, client_thread, NULL);
// Loop until client tells us it's done
char *string = zstr_recv (client);
free (string);
// Kill server thread
zctx_destroy (&ctx);
return 0;
}