mirror of
https://github.com/dragonheim/gagent.git
synced 2025-04-25 22:48:59 -07:00
[CI SKIP] Refactored project layout per Mark Wolfe Blog : https://www.wolfe.id.au/2020/03/10/how-do-i-structure-my-go-project/
This commit is contained in:
parent
0557021282
commit
a7747040ce
31 changed files with 15 additions and 45 deletions
54
internal/client/client.go
Normal file
54
internal/client/client.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
|
||||
|
||||
zmq "github.com/pebbe/zmq4"
|
||||
)
|
||||
|
||||
// Main is the initiation function for a Client
|
||||
func Main(config gs.GagentConfig, rid int, agent string) {
|
||||
log.Printf("[INFO] Starting client\n")
|
||||
|
||||
// Generate connect string for this router.
|
||||
var rport = int64(config.ClientPort)
|
||||
if config.Routers[rid].ClientPort != 0 {
|
||||
rport = config.Routers[rid].ClientPort
|
||||
}
|
||||
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport)
|
||||
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
|
||||
|
||||
var mu sync.Mutex
|
||||
|
||||
sock, _ := zmq.NewSocket(zmq.REQ)
|
||||
defer sock.Close()
|
||||
|
||||
sock.SetIdentity(config.UUID)
|
||||
sock.Connect(connectString)
|
||||
|
||||
go func() {
|
||||
mu.Lock()
|
||||
log.Printf("[DEBUG] Start sending agent...\n")
|
||||
sock.SendMessage(agent)
|
||||
log.Printf("[DEBUG] End sending agent...\n")
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
// time.Sleep(10 * time.Millisecond)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// for {
|
||||
// time.Sleep(10 * time.Millisecond)
|
||||
// mu.Lock()
|
||||
// msg, err := sock.RecvMessage(zmq.DONTWAIT)
|
||||
// if err == nil {
|
||||
// log.Println(msg[0], config.UUID)
|
||||
// }
|
||||
// mu.Unlock()
|
||||
// }
|
||||
}
|
110
internal/gstructs/gstructs.go
Normal file
110
internal/gstructs/gstructs.go
Normal file
|
@ -0,0 +1,110 @@
|
|||
package gstructs
|
||||
|
||||
// GagentConfig is the primary construct used by all modes
|
||||
type GagentConfig struct {
|
||||
Name string `hcl:"name,optional"`
|
||||
Mode string `hcl:"mode,attr"`
|
||||
UUID string `hcl:"uuid,optional"`
|
||||
ListenAddr string `hcl:"listenaddr,optional"`
|
||||
ClientPort int64 `hcl:"clientport,optional"`
|
||||
RouterPort int64 `hcl:"routerport,optional"`
|
||||
WorkerPort int64 `hcl:"workerport,optional"`
|
||||
Clients []*ClientDetails `hcl:"client,block"`
|
||||
Routers []*RouterDetails `hcl:"router,block"`
|
||||
Workers []*WorkerDetails `hcl:"worker,block"`
|
||||
}
|
||||
|
||||
// ClientDetails is details about known clients
|
||||
type ClientDetails struct {
|
||||
/*
|
||||
* Client name for display purposes in logs and
|
||||
* diagnostics.
|
||||
*/
|
||||
ClientName string `hcl:",label"`
|
||||
|
||||
/*
|
||||
* UUID String for the client node. This is used by
|
||||
* the router to determine which MQ client to send
|
||||
* the agent's results to. This attempts to keep the
|
||||
* clients unique globally.
|
||||
*/
|
||||
ClientID string `hcl:"clientid,optional"`
|
||||
}
|
||||
|
||||
// RouterDetails is details about known routers
|
||||
type RouterDetails struct {
|
||||
/*
|
||||
* Router name for display purposes in logs and
|
||||
* diagnostics
|
||||
*/
|
||||
RouterName string `hcl:",label"`
|
||||
|
||||
/*
|
||||
* UUID String for the router node. This is used by
|
||||
* the clients, routers, and workers to determine
|
||||
* which MQ router to send the agent's requests to.
|
||||
* This attempts to keep the routers unique globally.
|
||||
*/
|
||||
RouterID string `hcl:"routerid,attr"`
|
||||
|
||||
/*
|
||||
* This is the IP address or hostname the router
|
||||
* will listen on. The router will start up a 0MQ
|
||||
* service that clients and workers will connect to.
|
||||
*/
|
||||
RouterAddr string `hcl:"address,attr"`
|
||||
|
||||
/*
|
||||
* This is the is the port that the router listens
|
||||
* on for clients. If not defined, it will default
|
||||
* to 35571.
|
||||
*/
|
||||
ClientPort int64 `hcl:"clientport,optional"`
|
||||
|
||||
/*
|
||||
* This is the is the port that the router listens
|
||||
* on for routers. If not defined, it will default
|
||||
* to 35570.
|
||||
*/
|
||||
RouterPort int64 `hcl:"workerport,optional"`
|
||||
|
||||
/*
|
||||
* This is the is the port that the router listens
|
||||
* on for clients. If not defined, it will default
|
||||
* to 35572.
|
||||
*/
|
||||
WorkerPort int64 `hcl:"workerport,optional"`
|
||||
|
||||
/*
|
||||
* These tags will be passed to the router upon
|
||||
* connection. The router will then use these
|
||||
* tags to help determine which worker / client
|
||||
* to send the client's requests and results to.
|
||||
*/
|
||||
RouterTags []string `hcl:"tags,optional"`
|
||||
}
|
||||
|
||||
// WorkerDetails is details about known workers
|
||||
type WorkerDetails struct {
|
||||
/*
|
||||
* Router name for display purposes in logs and
|
||||
* diagnostics
|
||||
*/
|
||||
WorkerName string `hcl:",label"`
|
||||
|
||||
/*
|
||||
* UUID String for the worker node. This is used
|
||||
* by the router to determine which MQ client to
|
||||
* send agents to. This attempts to keep the
|
||||
* workers unique globally.
|
||||
*/
|
||||
WorkerID string `hcl:"workerid,attr"`
|
||||
|
||||
/*
|
||||
* These tags will be passed to the router upon
|
||||
* connection. The router will then use these
|
||||
* tags to help determine which worker / client
|
||||
* to send the agent and it's results to.
|
||||
*/
|
||||
WorkerTags []string `hcl:"tags,optional"`
|
||||
}
|
91
internal/router/router.go
Normal file
91
internal/router/router.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
|
||||
|
||||
zmq "github.com/pebbe/zmq4"
|
||||
)
|
||||
|
||||
// @TODO -- This was documented in the example, and I am unclear what it does
|
||||
const (
|
||||
WORKER_READY = "\001" // Signals worker is ready
|
||||
)
|
||||
|
||||
// Main is the initiation function for a Router
|
||||
func Main(config gs.GagentConfig) {
|
||||
log.Printf("[INFO] Starting router\n")
|
||||
|
||||
clientSock, _ := zmq.NewSocket(zmq.ROUTER)
|
||||
defer clientSock.Close()
|
||||
|
||||
workerSock, _ := zmq.NewSocket(zmq.DEALER)
|
||||
defer workerSock.Close()
|
||||
|
||||
clientSock.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ClientPort))
|
||||
workerSock.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.WorkerPort))
|
||||
|
||||
workers := make([]string, 0)
|
||||
|
||||
poller1 := zmq.NewPoller()
|
||||
poller1.Add(workerSock, zmq.POLLIN)
|
||||
|
||||
poller2 := zmq.NewPoller()
|
||||
poller2.Add(workerSock, zmq.POLLIN)
|
||||
poller2.Add(clientSock, zmq.POLLIN)
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
// Poll frontend only if we have available workers
|
||||
var sockets []zmq.Polled
|
||||
var err error
|
||||
if len(workers) > 0 {
|
||||
sockets, err = poller2.Poll(-1)
|
||||
} else {
|
||||
sockets, err = poller1.Poll(-1)
|
||||
}
|
||||
if err != nil {
|
||||
break // Interrupted
|
||||
}
|
||||
for _, socket := range sockets {
|
||||
switch s := socket.Socket; s {
|
||||
case workerSock: // Handle worker activity on backend
|
||||
// Use worker identity for load-balancing
|
||||
msg, err := s.RecvMessage(0)
|
||||
if err != nil {
|
||||
break LOOP // Interrupted
|
||||
}
|
||||
var identity string
|
||||
identity, msg = unwrap(msg)
|
||||
log.Printf("[DEBUG] Worker message received: %s", msg)
|
||||
workers = append(workers, identity)
|
||||
|
||||
// Forward message to client if it's not a READY
|
||||
if msg[0] != WORKER_READY {
|
||||
clientSock.SendMessage(msg)
|
||||
}
|
||||
|
||||
case clientSock:
|
||||
// Get client request, route to first available worker
|
||||
msg, err := s.RecvMessage(0)
|
||||
log.Printf("[DEBUG] Client message received: %s", msg)
|
||||
if err == nil {
|
||||
workerSock.SendMessage(workers[0], "", msg)
|
||||
workers = workers[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func unwrap(msg []string) (head string, tail []string) {
|
||||
head = msg[0]
|
||||
if len(msg) > 1 && msg[1] == "" {
|
||||
tail = msg[2:]
|
||||
} else {
|
||||
tail = msg[1:]
|
||||
}
|
||||
return
|
||||
}
|
35
internal/worker/worker.go
Normal file
35
internal/worker/worker.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
|
||||
|
||||
// picol "git.dragonheim.net/dragonheim/gagent/src/picol"
|
||||
zmq "github.com/pebbe/zmq4"
|
||||
)
|
||||
|
||||
// Main is the initiation function for a Worker
|
||||
func Main(config gs.GagentConfig, rid int) {
|
||||
log.Printf("[INFO] Starting worker\n")
|
||||
|
||||
// Generate connect string for this router.
|
||||
var rport = int64(config.WorkerPort)
|
||||
if config.Routers[rid].WorkerPort != 0 {
|
||||
rport = config.Routers[rid].WorkerPort
|
||||
}
|
||||
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport)
|
||||
|
||||
subscriber, _ := zmq.NewSocket(zmq.REP)
|
||||
defer subscriber.Close()
|
||||
|
||||
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
|
||||
subscriber.Connect(connectString)
|
||||
|
||||
msg, err := subscriber.Recv(0)
|
||||
if err != nil {
|
||||
log.Printf("[DEBUG] Received error: %v", err)
|
||||
}
|
||||
log.Printf("[DEBUG] Received message: %v", msg[0])
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue