Round-trip demonstrator in Java

package guide;

import org.zeromq.*;
import org.zeromq.ZMQ.Socket;

/**

  • Round-trip demonstrator. Broker, Worker and Client are mocked as separate
  • threads.

*/
public class tripping
{
static class Broker implements Runnable
{
@Override
public void run()
{
try (ZContext ctx = new ZContext()) {
Socket frontend = ctx.createSocket(SocketType.ROUTER);
Socket backend = ctx.createSocket(SocketType.ROUTER);
frontend.setHWM(0);
backend.setHWM(0);
frontend.bind("tcp://*:5555");
backend.bind("tcp://*:5556");

while (!Thread.currentThread().isInterrupted()) {
ZMQ.Poller items = ctx.createPoller(2);
items.register(frontend, ZMQ.Poller.POLLIN);
items.register(backend, ZMQ.Poller.POLLIN);

if (items.poll() == -1)
break; // Interrupted

if (items.pollin(0)) {
ZMsg msg = ZMsg.recvMsg(frontend);
if (msg == null)
break; // Interrupted
ZFrame address = msg.pop();
address.destroy();
msg.addFirst(new ZFrame("W"));
msg.send(backend);
}

if (items.pollin(1)) {
ZMsg msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted
ZFrame address = msg.pop();
address.destroy();
msg.addFirst(new ZFrame("C"));
msg.send(frontend);
}

items.close();
}
}
}
}

static class Worker implements Runnable
{
@Override
public void run()
{
try (ZContext ctx = new ZContext()) {
Socket worker = ctx.createSocket(SocketType.DEALER);
worker.setHWM(0);
worker.setIdentity("W".getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5556");
while (!Thread.currentThread().isInterrupted()) {
ZMsg msg = ZMsg.recvMsg(worker);
msg.send(worker);
}
}
}
}

static class Client implements Runnable
{
private static int SAMPLE_SIZE = 10000;

@Override
public void run()
{
try (ZContext ctx = new ZContext()) {
Socket client = ctx.createSocket(SocketType.DEALER);
client.setHWM(0);
client.setIdentity("C".getBytes(ZMQ.CHARSET));
client.connect("tcp://localhost:5555");
System.out.println("Setting up test");
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

int requests;
long start;

System.out.println("Synchronous round-trip test");
start = System.currentTimeMillis();

for (requests = 0; requests < SAMPLE_SIZE; requests++) {
ZMsg req = new ZMsg();
req.addString("hello");
req.send(client);
ZMsg.recvMsg(client).destroy();
}

long now = System.currentTimeMillis();
System.out.printf(
" %d calls/second\n", (1000 * SAMPLE_SIZE) / (now - start)
);

System.out.println("Asynchronous round-trip test");
start = System.currentTimeMillis();

for (requests = 0; requests < SAMPLE_SIZE; requests++) {
ZMsg req = new ZMsg();
req.addString("hello");
req.send(client);
}

for (requests = 0;
requests < SAMPLE_SIZE && !Thread.currentThread()
.isInterrupted();
requests++) {
ZMsg.recvMsg(client).destroy();
}

long now2 = System.currentTimeMillis();
System.out.printf(
" %d calls/second\n", (1000 * SAMPLE_SIZE) / (now2 - start)
);
}
}
}

public static void main(String[] args)
{
if (args.length == 1)
Client.SAMPLE_SIZE = Integer.parseInt(args[0]);

Thread brokerThread = new Thread(new Broker());
Thread workerThread = new Thread(new Worker());
Thread clientThread = new Thread(new Client());

brokerThread.setDaemon(true);
workerThread.setDaemon(true);

brokerThread.start();
workerThread.start();
clientThread.start();

try {
clientThread.join();
workerThread.interrupt();
brokerThread.interrupt();
Thread.sleep(200);// give them some time
}
catch (InterruptedException e) {
}
}
}