Asynchronous client/server in Go

//
// Asynchronous client-server
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task has its own
// context and conceptually acts as a separate process.
//
// Port of asyncsrv.c
// Written by: Aaron Clawson

package main

import (
"fmt"
zmq "github.com/alecthomas/gozmq"
"math/rand"
//"strings"
"strconv"
"time"
)

var finished = make(chan int)

func randomString() string {
source := "abcdefghijklmnopqrstuvwxyz1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ"
target := make([]byte, 20)
for i := 0; i < 20; i++ {
target = source[rand.Intn(len(source))]
}
return string(target)
}

// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.

func client_task() {
context, _ := zmq.NewContext()
defer context.Close()

// Set random identity to make tracing easier
identity := "Client-" + randomString()

client, _ := context.NewSocket(zmq.DEALER)
client.SetIdentity(identity)
client.Connect("ipc://frontend.ipc")
defer client.Close()

items := zmq.PollItems{
zmq.PollItem{Socket: client, Events: zmq.POLLIN},
}

reqs := 0
for {
//Read for a response 100 times for every message we send out
for i := 0; i < 100; i++ {
_, err := zmq.Poll(items, time.Millisecond*10)
if err != nil {
break // Interrupted
}

if items[0].REvents&zmq.POLLIN != 0 {
reply, _ := client.Recv(0)
fmt.Println(identity, "received", string(reply))
}
}

reqs += 1
req_str := "Request #" + strconv.Itoa(reqs)

client.Send([]byte(req_str), 0)

}

}

// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.

func server_task() {
context, _ := zmq.NewContext()
defer context.Close()

// Frontend socket talks to clients over TCP
frontend, _ := context.NewSocket(zmq.ROUTER)
frontend.Bind("ipc://frontend.ipc")
defer frontend.Close()

// Backend socket talks to workers over inproc
backend, _ := context.NewSocket(zmq.DEALER)
backend.Bind("ipc://backend.ipc")
defer backend.Close()

// Launch pool of worker threads, precise number is not critical
for i := 0; i < 5; i++ {
go server_worker()
}

// Connect backend to frontend via a proxy
items := zmq.PollItems{
zmq.PollItem{Socket: frontend, Events: zmq.POLLIN},
zmq.PollItem{Socket: backend, Events: zmq.POLLIN},
}

for {
_, err := zmq.Poll(items, -1)
if err != nil {
fmt.Println("Server exited with error:", err)
break
}

if items[0].REvents&zmq.POLLIN != 0 {

parts, _ := frontend.RecvMultipart(0)
backend.SendMultipart(parts, 0)

}
if items[1].REvents&zmq.POLLIN != 0 {

parts, _ := backend.RecvMultipart(0)
frontend.SendMultipart(parts, 0)
}
}
}

// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:

func server_worker() {
context, _ := zmq.NewContext()
defer context.Close()

// The DEALER socket gives us the reply envelope and message
worker, _ := context.NewSocket(zmq.DEALER)
worker.Connect("ipc://backend.ipc")
defer worker.Close()

for {
parts, _ := worker.RecvMultipart(0)

//Reply with 0..4 responses
replies := rand.Intn(5)
for i := 0; i < replies; i++ {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
worker.SendMultipart(parts, 0)
}

}
}

// The main thread simply starts several clients and a server, and then
// waits for the server to finish.

func main() {
rand.Seed(time.Now().UTC().UnixNano())

go client_task()
go client_task()
go client_task()

go server_task()

time.Sleep(time.Second * 5) [[span style="color:#408080"]]// Run for 5 seconds then quit
}