fix: (issues/3), Daemonize the router.

This commit is contained in:
James Wells 2021-02-26 08:33:42 -08:00
commit 856cebb0dc
Signed by: jwells
GPG key ID: 73196D10B8E65666
4 changed files with 68 additions and 60 deletions

View file

@ -1,17 +1,18 @@
package main package main
import ( import (
"fmt" // "fmt"
"io/ioutil" "io/ioutil"
"log"
"os" "os"
// "math/rand" // "math/rand"
// "time" "time"
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs" gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
// client "git.dragonheim.net/dragonheim/gagent/src/client" // client "git.dragonheim.net/dragonheim/gagent/src/client"
// router "git.dragonheim.net/dragonheim/gagent/src/router" gr "git.dragonheim.net/dragonheim/gagent/src/router"
// worker "git.dragonheim.net/dragonheim/gagent/src/worker" // worker "git.dragonheim.net/dragonheim/gagent/src/worker"
docopt "github.com/aviddiviner/docopt-go" docopt "github.com/aviddiviner/docopt-go"
@ -30,6 +31,7 @@ var exitCodes = struct {
"AGENT_MISSING_TAGS": 5, "AGENT_MISSING_TAGS": 5,
"NO_ROUTERS_DEFINED": 6, "NO_ROUTERS_DEFINED": 6,
"NO_WORKERS_DEFINED": 6, "NO_WORKERS_DEFINED": 6,
"NO_WORKERS_DEFINED": 7,
}} }}
func main() { func main() {
@ -93,7 +95,6 @@ func main() {
* dictionary of the command line arguments. * dictionary of the command line arguments.
*/ */
arguments, _ := docopt.ParseDoc(usage) arguments, _ := docopt.ParseDoc(usage)
fmt.Printf("Arguments are %v\n", arguments)
if arguments["--config"] != nil { if arguments["--config"] != nil {
configFile = arguments["--config"].(string) configFile = arguments["--config"].(string)
@ -107,14 +108,12 @@ func main() {
} else { } else {
err := hclsimple.DecodeFile(configFile, nil, &config) err := hclsimple.DecodeFile(configFile, nil, &config)
if err != nil { if err != nil {
fmt.Printf("Failed to load configuration file: %s.\n", configFile) log.Printf("Failed to load configuration file: %s.\n", configFile)
fmt.Println(err) log.Printf("%s\n",err)
os.Exit(exitCodes.m["CONFIG_FILE_MISSING"]) os.Exit(exitCodes.m["CONFIG_FILE_MISSING"])
} }
} }
fmt.Printf("Configuration is %v\n", config)
switch config.Mode { switch config.Mode {
case "client": case "client":
/* /*
@ -124,16 +123,18 @@ func main() {
* will contact the router and attempt to retrieve the results * will contact the router and attempt to retrieve the results
* of it's most recent request. * of it's most recent request.
*/ */
fmt.Printf("Running in client mode\n") log.Printf("Arguments are %v\n", arguments)
log.Printf("Configuration is %v\n", config)
log.Printf("Running in client mode\n")
agent, err := ioutil.ReadFile(arguments["--agent"].(string)) agent, err := ioutil.ReadFile(arguments["--agent"].(string))
if err == nil { if err == nil {
fmt.Printf("Agent containts %v\n", string(agent)) log.Printf("Agent containts %v\n", string(agent))
// fmt.Printf("Forking...\n") log.Printf("Forking...\n")
// go client.Main(config.Client, string(agent)) // go client.Main(config, string(agent))
// fmt.Printf("Forked thread has completed\n") log.Printf("Forked thread has completed\n")
// time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
} else { } else {
fmt.Printf("Failed to load Agent file: %s.\n", arguments["--agent"].(string)) log.Printf("Failed to load Agent file: %s.\n", arguments["--agent"].(string))
os.Exit(exitCodes.m["AGENT_LOAD_FAILED"]) os.Exit(exitCodes.m["AGENT_LOAD_FAILED"])
} }
@ -145,9 +146,11 @@ func main() {
* or client node. Tags are used by the agent to give hints as to where * or client node. Tags are used by the agent to give hints as to where
* it should be routed. * it should be routed.
*/ */
fmt.Printf("Running in router mode\n") log.Printf("Arguments are %v\n", arguments)
// go router.Main(config.Router) log.Printf("Configuration is %v\n", config)
// select {} log.Printf("Running in router mode\n")
go gr.Main(config)
select {}
case "worker": case "worker":
/* /*
@ -156,16 +159,17 @@ func main() {
* router(s) they are connected. The worker will execute the agent code and * router(s) they are connected. The worker will execute the agent code and
* pass the agent and it's results to a router. * pass the agent and it's results to a router.
*/ */
fmt.Printf("Running in worker mode\n") log.Printf("Arguments are %v\n", arguments)
// go worker.Main(config.Worker) log.Printf("Configuration is %v\n", config)
// go worker.Main(config)
// select {} // select {}
case "setup": case "setup":
fmt.Printf("Running in setup mode\n") log.Printf("Running in setup mode\n")
os.Exit(exitCodes.m["SETUP_FAILED"]) os.Exit(exitCodes.m["SETUP_FAILED"])
default: default:
fmt.Printf("Unknown operating mode, exiting.\n") log.Printf("Unknown operating mode, exiting.\n")
os.Exit(exitCodes.m["INVALID_MODE"]) os.Exit(exitCodes.m["INVALID_MODE"])
} }

View file

@ -5,8 +5,8 @@ type GagentConfig struct {
Name string `hcl:"name,optional"` Name string `hcl:"name,optional"`
Mode string `hcl:"mode,attr"` Mode string `hcl:"mode,attr"`
UUID string `hcl:"uuid,optional"` UUID string `hcl:"uuid,optional"`
ListenAddr string `hc:"listenaddr,optional"` ListenAddr string `hcl:"address,optional"`
ListenPort int `hc:"listenport,optional"` ListenPort int `hcl:"port,optional"`
Clients []*ClientDetails `hcl:"client,block"` Clients []*ClientDetails `hcl:"client,block"`
Routers []*RouterDetails `hcl:"router,block"` Routers []*RouterDetails `hcl:"router,block"`
Workers []*WorkerDetails `hcl:"worker,block"` Workers []*WorkerDetails `hcl:"worker,block"`

View file

@ -23,41 +23,8 @@ func pop(msg []string) (head, tail []string) {
return return
} }
// Main is the initiation function for a Router // Each router task works on one request at a time and sends a random number
func Main(config gs.GagentConfig) {
/*
* This is our router task.
*
* It uses the multi-threaded server model to deal requests out to a
* pool of workers and route replies back to clients. One worker can
* handle one request at a time but one client can talk to multiple
* workers at once.
*
* Frontend socket talks to clients over TCP
*/
frontend, _ := zmq.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ListenPort))
// Backend socket talks to workers over inproc
backend, _ := zmq.NewSocket(zmq.DEALER)
defer backend.Close()
backend.Bind("inproc://backend")
// Launch pool of worker threads, precise number is not critical
for i := 0; i < 5; i++ {
go agentRouter(i)
}
// Connect backend to frontend via a proxy
err := zmq.Proxy(frontend, backend, nil)
log.Fatalln("Proxy interrupted:", err)
}
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies: // of replies back, with random delays between replies:
func agentRouter(workerNum int) { func agentRouter(workerNum int) {
interp := picol.InitInterp() interp := picol.InitInterp()
interp.RegisterCoreCommands() interp.RegisterCoreCommands()
@ -77,9 +44,44 @@ func agentRouter(workerNum int) {
// Sleep for some fraction of a second // Sleep for some fraction of a second
time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond) time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond)
fmt.Println(fmt.Sprintf("Worker %d: %s", workerNum, identity)) log.Printf("Worker %d: %s\n", workerNum, identity)
fmt.Println(fmt.Sprintf("Worker %d: %s", workerNum, content)) log.Printf("Worker %d: %s\n", workerNum, content)
worker.SendMessage(identity, content) worker.SendMessage(identity, content)
} }
} }
} }
// Main is the initiation function for a Router
func Main(config gs.GagentConfig) {
/*
* This is our router task.
*
* It uses the multi-threaded server model to deal requests out to a
* pool of workers and route replies back to clients. One worker can
* handle one request at a time but one client can talk to multiple
* workers at once.
*
* Frontend socket talks to clients over TCP
*/
frontend, _ := zmq.NewSocket(zmq.ROUTER)
log.Printf("Starting router\n")
defer frontend.Close()
frontend.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ListenPort))
// Backend socket talks to workers over inproc
backend, _ := zmq.NewSocket(zmq.DEALER)
defer backend.Close()
backend.Bind("inproc://backend")
// Launch pool of worker threads, precise number is not critical
for i := 0; i < 5; i++ {
go agentRouter(i)
}
// Connect backend to frontend via a proxy
err := zmq.Proxy(frontend, backend, nil)
log.Fatalln("Proxy interrupted:", err)
}

View file

@ -27,6 +27,8 @@ func pop(msg []string) (head, tail []string) {
func Main(config gs.GagentConfig) { func Main(config gs.GagentConfig) {
// Frontend socket talks to clients over TCP // Frontend socket talks to clients over TCP
frontend, _ := zmq.NewSocket(zmq.ROUTER) frontend, _ := zmq.NewSocket(zmq.ROUTER)
fmt.Printf("Running in worker mode\n")
defer frontend.Close() defer frontend.Close()
connectString := fmt.Sprintf("tcp://%s", config.Routers[0].RouterAddr) connectString := fmt.Sprintf("tcp://%s", config.Routers[0].RouterAddr)
frontend.Bind(connectString) frontend.Bind(connectString)