--
-- Least-recently used (LRU) queue device
-- Clients and workers are shown here in-process
--
-- While this example runs in a single process, that is just to make
-- it easier to start and stop the example. Each thread has its own
-- context and conceptually acts as a separate process.
--
-- Author: Robert G. Jakabosky <moc.mlaerderahs|ybbob#moc.mlaerderahs|ybbob>
--
require"zmq"
require"zmq.threads"
require"zmq.poller"
require"zhelpers"
local tremove = table.remove
local NBR_CLIENTS = 10
local NBR_WORKERS = 3
local pre_code = [[
local identity, seed = …
local zmq = require"zmq"
require"zhelpers"
math.randomseed(seed)
]]
-- Basic request-reply client using REQ socket
-- Since s_send and s_recv can't handle 0MQ binary identities we
-- set a printable text identity to allow routing.
--
local client_task = pre_code .. [[
local context = zmq.init(1)
local client = context:socket(zmq.REQ)
client:setopt(zmq.IDENTITY, identity) -- Set a printable identity
client:connect("ipc://frontend.ipc")
-- Send request, get reply
client:send("HELLO")
local reply = client:recv()
printf ("Client: %s\n", reply)
client:close()
context:term()
]]
-- Worker using REQ socket to do LRU routing
-- Since s_send and s_recv can't handle 0MQ binary identities we
-- set a printable text identity to allow routing.
--
local worker_task = pre_code .. [[
local context = zmq.init(1)
local worker = context:socket(zmq.REQ)
worker:setopt(zmq.IDENTITY, identity) -- Set a printable identity
worker:connect("ipc://backend.ipc")
-- Tell broker we're ready for work
worker:send("READY")
while true do
-- Read and save all frames until we get an empty frame
-- In this example there is only 1 but it could be more
local address = worker:recv()
local empty = worker:recv()
assert (#empty == 0)
-- Get request, send reply
local request = worker:recv()
printf ("Worker: %s\n", request)
worker:send(address, zmq.SNDMORE)
worker:send("", zmq.SNDMORE)
worker:send("OK")
end
worker:close()
context:term()
]]
s_version_assert (2, 1)
-- Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.ROUTER)
local backend = context:socket(zmq.ROUTER)
frontend:bind("ipc://frontend.ipc")
backend:bind("ipc://backend.ipc")
local clients = {}
for n=1,NBR_CLIENTS do
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
local seed = os.time() + math.random()
clients[n] = zmq.threads.runstring(context, client_task, identity, seed)
clients[n]:start()
end
local workers = {}
for n=1,NBR_WORKERS do
local identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
local seed = os.time() + math.random()
workers[n] = zmq.threads.runstring(context, worker_task, identity, seed)
workers[n]:start(true)
end
-- Logic of LRU loop
-- - Poll backend always, frontend only if 1+ worker ready
-- - If worker replies, queue worker as ready and forward reply
-- to client if necessary
-- - If client requests, pop next worker and send request to it
-- Queue of available workers
local worker_queue = {}
local is_accepting = false
local max_requests = #clients
local poller = zmq.poller(2)
local function frontend_cb()
-- Now get next client request, route to LRU worker
-- Client request is [address][empty][request]
local client_addr = frontend:recv()
local empty = frontend:recv()
assert (#empty == 0)
local request = frontend:recv()
-- Dequeue a worker from the queue.
local worker = tremove(worker_queue, 1)
backend:send(worker, zmq.SNDMORE)
backend:send("", zmq.SNDMORE)
backend:send(client_addr, zmq.SNDMORE)
backend:send("", zmq.SNDMORE)
backend:send(request)
if (#worker_queue == 0) then
-- stop accepting work from clients, when no workers are available.
poller:remove(frontend)
is_accepting = false
end
end
poller:add(backend, zmq.POLLIN, function()
-- Queue worker address for LRU routing
local worker_addr = backend:recv()
worker_queue[#worker_queue + 1] = worker_addr
-- start accepting client requests, if we are not already doing so.
if not is_accepting then
is_accepting = true
poller:add(frontend, zmq.POLLIN, frontend_cb)
end
-- Second frame is empty
local empty = backend:recv()
assert (#empty == 0)
-- Third frame is READY or else a client reply address
local client_addr = backend:recv()
-- If client reply, send rest back to frontend
if (client_addr ~= "READY") then
empty = backend:recv()
assert (#empty == 0)
local reply = backend:recv()
frontend:send(client_addr, zmq.SNDMORE)
frontend:send("", zmq.SNDMORE)
frontend:send(reply)
max_requests = max_requests - 1
if (max_requests == 0) then
poller:stop() -- Exit after N messages
end
end
end)
-- start poller's event loop
poller:start()
frontend:close()
backend:close()
context:term()
for n=1,NBR_CLIENTS do
assert(clients[n]:join())
end
-- workers are detached, we don't need to join with them.