fix: (issues/7): Client is sending agent to router(s) and collecting response(s).

This commit is contained in:
James Wells 2021-04-03 16:39:33 -07:00
parent f67d6da01d
commit 4a93101d85
Signed by: jwells
GPG key ID: 73196D10B8E65666
6 changed files with 33 additions and 19 deletions

View file

@ -30,6 +30,7 @@ Line 4 indicates the start of the hints.
Lines 5 - 7 are a list of hints that the router will use to determine which router(s) may have domain specific information. Lines 5 - 7 are a list of hints that the router will use to determine which router(s) may have domain specific information.
Line 8 indicates the end of the hints. Line 8 indicates the end of the hints.
Lines 9 - 11 are a tcl procedure that will be executed on the worker before sending the results back to the client. Lines 9 - 11 are a tcl procedure that will be executed on the worker before sending the results back to the client.
Line 12 executes the procedure defined above. Line 12 executes the procedure defined above.

View file

@ -44,12 +44,12 @@ uuid = "7e9d13fe-5151-5876-66c0-20ca03e8fca4"
/. /.
* This is the port to listen on, it defaults to * This is the port to listen on, it defaults to
* 33570. It is strongly recommended that you not * 35570. It is strongly recommended that you not
* use ports 0 - 1024 * use ports 0 - 1024
* *
* Optional. * Optional.
* *
* listenport = 33570 * listenport = 35570
*/ */
/* /*

View file

@ -1,13 +1,12 @@
package main package main
import ( import (
// "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
"time"
// "math/rand" // "math/rand"
"time"
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs" gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
@ -131,16 +130,15 @@ func main() {
os.Exit(exitCodes.m["AGENT_NOT_DEFINED"]) os.Exit(exitCodes.m["AGENT_NOT_DEFINED"])
} }
agent, err := ioutil.ReadFile(arguments["--agent"].(string)) agent, err := ioutil.ReadFile(arguments["--agent"].(string))
if err == nil { if err != nil {
// log.Printf("Agent containts %v\n", string(agent))
log.Printf("Forking...\n")
go gc.Main(config, string(agent))
log.Printf("Forked thread has completed\n")
time.Sleep(10 * time.Second)
} else {
log.Printf("Failed to load Agent file: %s", arguments["--agent"]) log.Printf("Failed to load Agent file: %s", arguments["--agent"])
os.Exit(exitCodes.m["AGENT_LOAD_FAILED"]) os.Exit(exitCodes.m["AGENT_LOAD_FAILED"])
} }
for key := range config.Routers {
log.Printf("Calling for router %d", key)
go gc.Main(config, key, string(agent))
time.Sleep(10 * time.Second)
}
case "router": case "router":
/* /*

View file

@ -2,6 +2,8 @@ package client
import ( import (
"fmt" "fmt"
"log"
"strconv"
"sync" "sync"
"time" "time"
@ -11,13 +13,19 @@ import (
) )
// Main is the initiation function for a Client // Main is the initiation function for a Client
func Main(config gs.GagentConfig, agent string) { func Main(config gs.GagentConfig, routerID int, agent string) {
var mu sync.Mutex var mu sync.Mutex
var rport = int(config.ListenPort)
if config.Routers[routerID].RouterPort != "" {
rport, _ = strconv.Atoi(config.Routers[routerID].RouterPort)
}
fmt.Printf("Did we make it this far?\n") log.Printf("--|%#v|--\n", agent)
fmt.Printf("--|%#v|--\n", agent)
connectString := fmt.Sprintf("tcp://%s", config.Routers[0].RouterAddr) connectString := fmt.Sprintf("tcp://%s:%d",
config.Routers[routerID].RouterAddr,
rport)
log.Printf("Attempting to connect to %s\n", connectString)
sock, _ := zmq.NewSocket(zmq.DEALER) sock, _ := zmq.NewSocket(zmq.DEALER)
defer sock.Close() defer sock.Close()
@ -36,7 +44,7 @@ func Main(config gs.GagentConfig, agent string) {
mu.Lock() mu.Lock()
msg, err := sock.RecvMessage(zmq.DONTWAIT) msg, err := sock.RecvMessage(zmq.DONTWAIT)
if err == nil { if err == nil {
fmt.Println(msg[0], config.UUID) log.Println(msg[0], config.UUID)
} }
mu.Unlock() mu.Unlock()
} }

View file

@ -46,12 +46,20 @@ type RouterDetails struct {
RouterID string `hcl:"uuid,attr"` RouterID string `hcl:"uuid,attr"`
/* /*
* This is the IP Address and port that the router * This is the IP address or hostname the router
* will listen on. The router will start up a 0MQ * will listen on. The router will start up a 0MQ
* service that clients and workers will connect to. * service that clients and workers will connect to.
*/ */
RouterAddr string `hcl:"address,attr"` RouterAddr string `hcl:"address,attr"`
/*
* This is the is the port that the router listens
* on. If not defined, it will default to 35570
* The router will start up a 0MQ service that
* clients and workers will connect to.
*/
RouterPort string `hcl:"port,optional"`
/* /*
* These tags will be passed to the router upon * These tags will be passed to the router upon
* connection. The router will then use these * connection. The router will then use these

View file

@ -37,6 +37,7 @@ func agentRouter(workerNum int) {
// The DEALER socket gives us the reply envelope and message // The DEALER socket gives us the reply envelope and message
msg, _ := worker.RecvMessage(0) msg, _ := worker.RecvMessage(0)
identity, content := pop(msg) identity, content := pop(msg)
log.Printf("Recieved message: %s", content)
// Send 0..4 replies back // Send 0..4 replies back
replies := rand.Intn(5) replies := rand.Intn(5)
@ -51,7 +52,6 @@ func agentRouter(workerNum int) {
} }
} }
// Main is the initiation function for a Router // Main is the initiation function for a Router
func Main(config gs.GagentConfig) { func Main(config gs.GagentConfig) {
/* /*
@ -84,4 +84,3 @@ func Main(config gs.GagentConfig) {
err := zmq.Proxy(frontend, backend, nil) err := zmq.Proxy(frontend, backend, nil)
log.Fatalln("Proxy interrupted:", err) log.Fatalln("Proxy interrupted:", err)
} }