mirror of
https://github.com/dragonheim/gagent.git
synced 2025-04-26 23:18:58 -07:00
fix: (issues/9) Worker does not support port assignment for routers (#11)
Quite a refactor and some code cleanup. Co-authored-by: James Wells <jwells@dragonheim.net> Reviewed-on: #11 Co-authored-by: James Wells <jwells@noreply.localhost> Co-committed-by: James Wells <jwells@noreply.localhost>
This commit is contained in:
parent
881a11316d
commit
7d6fbfef24
21 changed files with 451 additions and 271 deletions
|
@ -3,31 +3,28 @@ package client
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
|
||||
|
||||
zmq "github.com/pebbe/zmq4"
|
||||
)
|
||||
|
||||
// Main is the initiation function for a Client
|
||||
func Main(config gs.GagentConfig, routerID int, agent string) {
|
||||
var mu sync.Mutex
|
||||
var rport = int(config.ListenPort)
|
||||
if config.Routers[routerID].RouterPort != "" {
|
||||
rport, _ = strconv.Atoi(config.Routers[routerID].RouterPort)
|
||||
func Main(config gs.GagentConfig, rid int, agent string) {
|
||||
log.Printf("[INFO] Starting client\n")
|
||||
|
||||
// Generate connect string for this router.
|
||||
var rport = int64(config.ClientPort)
|
||||
if config.Routers[rid].ClientPort != 0 {
|
||||
rport = config.Routers[rid].ClientPort
|
||||
}
|
||||
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport)
|
||||
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
|
||||
|
||||
log.Printf("--|%#v|--\n", agent)
|
||||
var mu sync.Mutex
|
||||
|
||||
connectString := fmt.Sprintf("tcp://%s:%d",
|
||||
config.Routers[routerID].RouterAddr,
|
||||
rport)
|
||||
log.Printf("Attempting to connect to %s\n", connectString)
|
||||
|
||||
sock, _ := zmq.NewSocket(zmq.DEALER)
|
||||
sock, _ := zmq.NewSocket(zmq.REQ)
|
||||
defer sock.Close()
|
||||
|
||||
sock.SetIdentity(config.UUID)
|
||||
|
@ -35,17 +32,22 @@ func Main(config gs.GagentConfig, routerID int, agent string) {
|
|||
|
||||
go func() {
|
||||
mu.Lock()
|
||||
log.Printf("[DEBUG] Start sending agent...\n")
|
||||
sock.SendMessage(agent)
|
||||
log.Printf("[DEBUG] End sending agent...\n")
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
mu.Lock()
|
||||
msg, err := sock.RecvMessage(zmq.DONTWAIT)
|
||||
if err == nil {
|
||||
log.Println(msg[0], config.UUID)
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
// time.Sleep(10 * time.Millisecond)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// for {
|
||||
// time.Sleep(10 * time.Millisecond)
|
||||
// mu.Lock()
|
||||
// msg, err := sock.RecvMessage(zmq.DONTWAIT)
|
||||
// if err == nil {
|
||||
// log.Println(msg[0], config.UUID)
|
||||
// }
|
||||
// mu.Unlock()
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -6,7 +6,9 @@ type GagentConfig struct {
|
|||
Mode string `hcl:"mode,attr"`
|
||||
UUID string `hcl:"uuid,optional"`
|
||||
ListenAddr string `hcl:"listenaddr,optional"`
|
||||
ListenPort int `hcl:"listenport,optional"`
|
||||
ClientPort int64 `hcl:"clientport,optional"`
|
||||
RouterPort int64 `hcl:"routerport,optional"`
|
||||
WorkerPort int64 `hcl:"workerport,optional"`
|
||||
Clients []*ClientDetails `hcl:"client,block"`
|
||||
Routers []*RouterDetails `hcl:"router,block"`
|
||||
Workers []*WorkerDetails `hcl:"worker,block"`
|
||||
|
@ -26,7 +28,7 @@ type ClientDetails struct {
|
|||
* the agent's results to. This attempts to keep the
|
||||
* clients unique globally.
|
||||
*/
|
||||
ClientID string `hcl:"clientid,attr"`
|
||||
ClientID string `hcl:"clientid,optional"`
|
||||
}
|
||||
|
||||
// RouterDetails is details about known routers
|
||||
|
@ -43,7 +45,7 @@ type RouterDetails struct {
|
|||
* which MQ router to send the agent's requests to.
|
||||
* This attempts to keep the routers unique globally.
|
||||
*/
|
||||
RouterID string `hcl:"uuid,attr"`
|
||||
RouterID string `hcl:"routerid,attr"`
|
||||
|
||||
/*
|
||||
* This is the IP address or hostname the router
|
||||
|
@ -54,11 +56,24 @@ type RouterDetails struct {
|
|||
|
||||
/*
|
||||
* This is the is the port that the router listens
|
||||
* on. If not defined, it will default to 35570
|
||||
* The router will start up a 0MQ service that
|
||||
* clients and workers will connect to.
|
||||
* on for clients. If not defined, it will default
|
||||
* to 35571.
|
||||
*/
|
||||
RouterPort string `hcl:"port,optional"`
|
||||
ClientPort int64 `hcl:"clientport,optional"`
|
||||
|
||||
/*
|
||||
* This is the is the port that the router listens
|
||||
* on for routers. If not defined, it will default
|
||||
* to 35570.
|
||||
*/
|
||||
RouterPort int64 `hcl:"workerport,optional"`
|
||||
|
||||
/*
|
||||
* This is the is the port that the router listens
|
||||
* on for clients. If not defined, it will default
|
||||
* to 35572.
|
||||
*/
|
||||
WorkerPort int64 `hcl:"workerport,optional"`
|
||||
|
||||
/*
|
||||
* These tags will be passed to the router upon
|
||||
|
@ -83,7 +98,7 @@ type WorkerDetails struct {
|
|||
* send agents to. This attempts to keep the
|
||||
* workers unique globally.
|
||||
*/
|
||||
WorkerID string `hcl:"uuid,attr"`
|
||||
WorkerID string `hcl:"workerid,attr"`
|
||||
|
||||
/*
|
||||
* These tags will be passed to the router upon
|
||||
|
|
20
src/picol/LICENSE
Normal file
20
src/picol/LICENSE
Normal file
|
@ -0,0 +1,20 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2014 Lain dono <lain.dono@gmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27
src/picol/README.md
Normal file
27
src/picol/README.md
Normal file
|
@ -0,0 +1,27 @@
|
|||
# picol.go
|
||||
|
||||
Original http://oldblog.antirez.com/post/picol.html
|
||||
|
||||
Sample use:
|
||||
```golang
|
||||
func CommandPuts(i *picol.Interp, argv []string, pd interface{}) (string, error) {
|
||||
if len(argv) != 2 {
|
||||
return "", fmt.Errorf("Wrong number of args for %s %s", argv[0], argv)
|
||||
}
|
||||
fmt.Println(argv[1])
|
||||
return "", nil
|
||||
}
|
||||
...
|
||||
interp := picol.InitInterp()
|
||||
// add core functions
|
||||
interp.RegisterCoreCommands()
|
||||
// add user function
|
||||
interp.RegisterCommand("puts", CommandPuts, nil)
|
||||
// eval
|
||||
result, err := interp.Eval(string(buf))
|
||||
if err != nil {
|
||||
fmt.Println("ERROR", err, result)
|
||||
} else {
|
||||
fmt.Println(result)
|
||||
}
|
||||
```
|
9
src/picol/examples/fib.tcl
Normal file
9
src/picol/examples/fib.tcl
Normal file
|
@ -0,0 +1,9 @@
|
|||
proc fib {x} {
|
||||
if {<= $x 1} {
|
||||
return 1
|
||||
} else {
|
||||
+ [fib [- $x 1]] [fib [- $x 2]]
|
||||
}
|
||||
}
|
||||
|
||||
puts [fib 20]
|
15
src/picol/examples/t2.tcl
Normal file
15
src/picol/examples/t2.tcl
Normal file
|
@ -0,0 +1,15 @@
|
|||
proc square {x} {
|
||||
* $x $x
|
||||
}
|
||||
|
||||
set a 1
|
||||
while {<= $a 10} {
|
||||
if {== $a 5} {
|
||||
puts {Missing five!}
|
||||
set a [+ $a 1]
|
||||
continue
|
||||
}
|
||||
puts "I can compute that $a*$a = [square $a]"
|
||||
set a [+ $a 1]
|
||||
}
|
||||
|
5
src/picol/examples/tst.tcl
Normal file
5
src/picol/examples/tst.tcl
Normal file
|
@ -0,0 +1,5 @@
|
|||
proc square {x} {
|
||||
* $x $x
|
||||
}
|
||||
|
||||
puts [square 5]
|
46
src/picol/picol/main.go
Normal file
46
src/picol/picol/main.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
picol "git.dragonheim.net/dragonheim/gagent/src/picol"
|
||||
)
|
||||
|
||||
var fname = flag.String("f", "", "file name")
|
||||
|
||||
func CommandPuts(i *picol.Interp, argv []string, pd interface{}) (string, error) {
|
||||
if len(argv) != 2 {
|
||||
return "", fmt.Errorf("Wrong number of args for %s %s", argv[0], argv)
|
||||
}
|
||||
fmt.Println(argv[1])
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
interp := picol.InitInterp()
|
||||
interp.RegisterCoreCommands()
|
||||
interp.RegisterCommand("puts", CommandPuts, nil)
|
||||
|
||||
buf, err := ioutil.ReadFile(*fname)
|
||||
if err == nil {
|
||||
result, err := interp.Eval(string(buf))
|
||||
if err != nil {
|
||||
fmt.Println("ERRROR", result, err)
|
||||
}
|
||||
} else {
|
||||
for {
|
||||
fmt.Print("picol> ")
|
||||
scanner := bufio.NewReader(os.Stdin)
|
||||
clibuf, _ := scanner.ReadString('\n')
|
||||
result, err := interp.Eval(clibuf[:len(clibuf)-1])
|
||||
if len(result) != 0 {
|
||||
fmt.Println("ERRROR", result, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,84 +3,88 @@ package router
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
|
||||
picol "git.dragonheim.net/dragonheim/gagent/src/picol"
|
||||
|
||||
zmq "github.com/pebbe/zmq4"
|
||||
)
|
||||
|
||||
func pop(msg []string) (head, tail []string) {
|
||||
if msg[1] == "" {
|
||||
head = msg[:2]
|
||||
tail = msg[2:]
|
||||
} else {
|
||||
head = msg[:1]
|
||||
tail = msg[1:]
|
||||
}
|
||||
return
|
||||
}
|
||||
const (
|
||||
WORKER_READY = "\001" // Signals worker is ready
|
||||
)
|
||||
|
||||
// Each router task works on one request at a time and sends a random number
|
||||
// of replies back, with random delays between replies:
|
||||
func agentRouter(workerNum int) {
|
||||
interp := picol.InitInterp()
|
||||
interp.RegisterCoreCommands()
|
||||
// Main is the initiation function for a Router
|
||||
func Main(config gs.GagentConfig) {
|
||||
log.Printf("[INFO] Starting router\n")
|
||||
|
||||
worker, _ := zmq.NewSocket(zmq.DEALER)
|
||||
defer worker.Close()
|
||||
worker.Connect("inproc://backend")
|
||||
client_sock, _ := zmq.NewSocket(zmq.ROUTER)
|
||||
defer client_sock.Close()
|
||||
|
||||
worker_sock, _ := zmq.NewSocket(zmq.DEALER)
|
||||
defer worker_sock.Close()
|
||||
|
||||
client_sock.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ClientPort))
|
||||
worker_sock.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.WorkerPort))
|
||||
|
||||
workers := make([]string, 0)
|
||||
|
||||
poller1 := zmq.NewPoller()
|
||||
poller1.Add(worker_sock, zmq.POLLIN)
|
||||
|
||||
poller2 := zmq.NewPoller()
|
||||
poller2.Add(worker_sock, zmq.POLLIN)
|
||||
poller2.Add(client_sock, zmq.POLLIN)
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
// The DEALER socket gives us the reply envelope and message
|
||||
msg, _ := worker.RecvMessage(0)
|
||||
identity, content := pop(msg)
|
||||
log.Printf("Recieved message: %s", content)
|
||||
// Poll frontend only if we have available workers
|
||||
var sockets []zmq.Polled
|
||||
var err error
|
||||
if len(workers) > 0 {
|
||||
sockets, err = poller2.Poll(-1)
|
||||
} else {
|
||||
sockets, err = poller1.Poll(-1)
|
||||
}
|
||||
if err != nil {
|
||||
break // Interrupted
|
||||
}
|
||||
for _, socket := range sockets {
|
||||
switch s := socket.Socket; s {
|
||||
case worker_sock: // Handle worker activity on backend
|
||||
// Use worker identity for load-balancing
|
||||
msg, err := s.RecvMessage(0)
|
||||
if err != nil {
|
||||
break LOOP // Interrupted
|
||||
}
|
||||
var identity string
|
||||
identity, msg = unwrap(msg)
|
||||
log.Printf("[DEBUG] Worker message received: %s", msg)
|
||||
workers = append(workers, identity)
|
||||
|
||||
// Send 0..4 replies back
|
||||
replies := rand.Intn(5)
|
||||
for reply := 0; reply < replies; reply++ {
|
||||
// Sleep for some fraction of a second
|
||||
time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond)
|
||||
// Forward message to client if it's not a READY
|
||||
if msg[0] != WORKER_READY {
|
||||
client_sock.SendMessage(msg)
|
||||
}
|
||||
|
||||
log.Printf("Worker %d: %s\n", workerNum, identity)
|
||||
log.Printf("Worker %d: %s\n", workerNum, content)
|
||||
worker.SendMessage(identity, content)
|
||||
case client_sock:
|
||||
// Get client request, route to first available worker
|
||||
msg, err := s.RecvMessage(0)
|
||||
log.Printf("[DEBUG] Client message received: %s", msg)
|
||||
if err == nil {
|
||||
worker_sock.SendMessage(workers[0], "", msg)
|
||||
workers = workers[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main is the initiation function for a Router
|
||||
func Main(config gs.GagentConfig) {
|
||||
/*
|
||||
* This is our router task.
|
||||
*
|
||||
* It uses the multi-threaded server model to deal requests out to a
|
||||
* pool of workers and route replies back to clients. One worker can
|
||||
* handle one request at a time but one client can talk to multiple
|
||||
* workers at once.
|
||||
*
|
||||
* Frontend socket talks to clients over TCP
|
||||
*/
|
||||
frontend, _ := zmq.NewSocket(zmq.ROUTER)
|
||||
log.Printf("Starting router\n")
|
||||
defer frontend.Close()
|
||||
|
||||
frontend.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ListenPort))
|
||||
|
||||
// Backend socket talks to workers over inproc
|
||||
backend, _ := zmq.NewSocket(zmq.DEALER)
|
||||
defer backend.Close()
|
||||
backend.Bind("inproc://backend")
|
||||
|
||||
// Launch pool of worker threads, precise number is not critical
|
||||
for i := 0; i < 5; i++ {
|
||||
go agentRouter(i)
|
||||
func unwrap(msg []string) (head string, tail []string) {
|
||||
head = msg[0]
|
||||
if len(msg) > 1 && msg[1] == "" {
|
||||
tail = msg[2:]
|
||||
} else {
|
||||
tail = msg[1:]
|
||||
}
|
||||
|
||||
// Connect backend to frontend via a proxy
|
||||
err := zmq.Proxy(frontend, backend, nil)
|
||||
log.Fatalln("Proxy interrupted:", err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -3,76 +3,32 @@ package worker
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
|
||||
picol "git.dragonheim.net/dragonheim/gagent/src/picol"
|
||||
|
||||
// picol "git.dragonheim.net/dragonheim/gagent/src/picol"
|
||||
zmq "github.com/pebbe/zmq4"
|
||||
)
|
||||
|
||||
func pop(msg []string) (head, tail []string) {
|
||||
if msg[1] == "" {
|
||||
head = msg[:2]
|
||||
tail = msg[2:]
|
||||
} else {
|
||||
head = msg[:1]
|
||||
tail = msg[1:]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Each worker task works on one request at a time and sends a random number
|
||||
// of replies back, with random delays between replies:
|
||||
func agentHandler(workerNum int) {
|
||||
interp := picol.InitInterp()
|
||||
interp.RegisterCoreCommands()
|
||||
|
||||
worker, _ := zmq.NewSocket(zmq.DEALER)
|
||||
defer worker.Close()
|
||||
worker.Connect("inproc://backend")
|
||||
|
||||
for {
|
||||
// The DEALER socket gives us the reply envelope and message
|
||||
msg, _ := worker.RecvMessage(0)
|
||||
identity, content := pop(msg)
|
||||
|
||||
// Send 0..4 replies back
|
||||
replies := rand.Intn(5)
|
||||
for reply := 0; reply < replies; reply++ {
|
||||
// Sleep for some fraction of a second
|
||||
time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond)
|
||||
|
||||
log.Printf(fmt.Sprintf("Worker %d: %s\n", workerNum, identity))
|
||||
worker.SendMessage(identity, content)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main is the initiation function for a Worker
|
||||
func Main(config gs.GagentConfig) {
|
||||
// Frontend socket talks to clients over TCP
|
||||
frontend, _ := zmq.NewSocket(zmq.ROUTER)
|
||||
log.Printf("Starting worker\n")
|
||||
func Main(config gs.GagentConfig, rid int) {
|
||||
log.Printf("[INFO] Starting worker\n")
|
||||
|
||||
defer frontend.Close()
|
||||
log.Printf("Attempting to connect to: %s(%s)\n", config.Routers[0].RouterName, config.Routers[0].RouterAddr)
|
||||
connectString := fmt.Sprintf("tcp://%s", config.Routers[0].RouterAddr)
|
||||
frontend.Bind(connectString)
|
||||
|
||||
// Backend socket talks to workers over inproc
|
||||
backend, _ := zmq.NewSocket(zmq.DEALER)
|
||||
defer backend.Close()
|
||||
backend.Bind("inproc://backend")
|
||||
|
||||
// Launch pool of agent handlers
|
||||
for i := 0; i < 5; i++ {
|
||||
go agentHandler(i)
|
||||
// Generate connect string for this router.
|
||||
var rport = int64(config.WorkerPort)
|
||||
if config.Routers[rid].WorkerPort != 0 {
|
||||
rport = config.Routers[rid].WorkerPort
|
||||
}
|
||||
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport)
|
||||
|
||||
// Connect backend to frontend via a proxy
|
||||
// err := zmq.Proxy(frontend, backend, nil)
|
||||
// log.Fatalln("Proxy interrupted:", err)
|
||||
subscriber, _ := zmq.NewSocket(zmq.REP)
|
||||
defer subscriber.Close()
|
||||
|
||||
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
|
||||
subscriber.Connect(connectString)
|
||||
|
||||
msg, err := subscriber.Recv(0)
|
||||
if err != nil {
|
||||
log.Printf("[DEBUG] Recieved error: %v", err)
|
||||
}
|
||||
log.Printf("[DEBUG] Recieved message: %v", msg[0])
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue