mirror of
https://github.com/dragonheim/gagent.git
synced 2025-01-18 09:36:28 -08:00
185 lines
3.9 KiB
Go
185 lines
3.9 KiB
Go
package router
|
|
|
|
import (
|
|
fmt "fmt"
|
|
log "log"
|
|
http "net/http"
|
|
strconv "strconv"
|
|
sync "sync"
|
|
|
|
gcdb "github.com/dragonheim/gagent/internal/chaindb"
|
|
gstructs "github.com/dragonheim/gagent/internal/gstructs"
|
|
|
|
zmq "github.com/pebbe/zmq4"
|
|
)
|
|
|
|
var (
|
|
db gcdb.GagentDb
|
|
)
|
|
|
|
/*
|
|
* The 'router' processes routing requests from the agent. The router does
|
|
* not handle any of the agent activities beyond processing the agent's
|
|
* list of tags and passing the agent and it's storage to either a member
|
|
* or client node. Tags are used by the agent to give hints as to where
|
|
* it should be routed.
|
|
* Main is the entrypoint for the router
|
|
*/
|
|
func Main(wg *sync.WaitGroup, config gstructs.GagentConfig) {
|
|
log.Printf("[INFO] Starting router\n")
|
|
defer wg.Done()
|
|
|
|
http.HandleFunc("/hello", AnswerClient)
|
|
clientSock, _ := zmq.NewSocket(zmq.ROUTER)
|
|
defer clientSock.Close()
|
|
|
|
workerSock, _ := zmq.NewSocket(zmq.DEALER)
|
|
defer workerSock.Close()
|
|
|
|
chain := gcdb.NewGagentDb()
|
|
log.Println("[DEBUG] Loading chaindb ")
|
|
err := chain.LoadHCL(config.ChainDBPath)
|
|
if err != nil {
|
|
log.Printf("[ERROR] Error loading chaindb: %s", err)
|
|
}
|
|
|
|
workerListener := "tcp://" + config.ListenAddr + ":" + strconv.Itoa(config.WorkerPort)
|
|
_ = workerSock.Bind(workerListener)
|
|
|
|
workers := make([]string, 0)
|
|
|
|
poller1 := zmq.NewPoller()
|
|
poller1.Add(workerSock, zmq.POLLIN)
|
|
|
|
poller2 := zmq.NewPoller()
|
|
poller2.Add(workerSock, zmq.POLLIN)
|
|
|
|
wg.Add(1)
|
|
go createClientListener(wg, config)
|
|
|
|
LOOP:
|
|
for {
|
|
/*
|
|
* Poll frontend only if we have available workers
|
|
*/
|
|
var sockets []zmq.Polled
|
|
var err error
|
|
if len(workers) > 0 {
|
|
sockets, err = poller2.Poll(-1)
|
|
} else {
|
|
sockets, err = poller1.Poll(-1)
|
|
}
|
|
if err != nil {
|
|
/*
|
|
* Interrupt
|
|
*/
|
|
break
|
|
}
|
|
for _, socket := range sockets {
|
|
switch s := socket.Socket; s {
|
|
case workerSock:
|
|
/*
|
|
* Handle worker activity on backend
|
|
* Use worker identity for load-balancing
|
|
*/
|
|
msg, err := s.RecvMessage(0)
|
|
if err != nil {
|
|
/*
|
|
* Interrupt
|
|
*/
|
|
break LOOP
|
|
}
|
|
var identity string
|
|
identity, msg = unwrap(msg)
|
|
log.Printf("[DEBUG] Worker message received: %s", msg)
|
|
workers = append(workers, identity)
|
|
|
|
case clientSock:
|
|
wg.Add(1)
|
|
go createClientListener(wg, config)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Create listener for client requests
|
|
*/
|
|
func createClientListener(wg *sync.WaitGroup, config gstructs.GagentConfig) error {
|
|
defer wg.Done()
|
|
|
|
clientSock, err := zmq.NewSocket(zmq.ROUTER)
|
|
if err != nil {
|
|
log.Printf("[ERROR] Error creating client socket: %s", err)
|
|
return err
|
|
}
|
|
defer clientSock.Close()
|
|
|
|
clientListener := "tcp://" + config.ListenAddr + ":" + strconv.Itoa(config.ClientPort)
|
|
log.Printf("[DEBUG] Binding to: %s", clientListener)
|
|
err = clientSock.Bind(clientListener)
|
|
if err != nil {
|
|
log.Printf("[ERROR] Error binding client socket: %s", err)
|
|
return err
|
|
}
|
|
|
|
for {
|
|
msg, err := clientSock.RecvMessage(0)
|
|
if err != nil {
|
|
break
|
|
}
|
|
log.Printf("[DEBUG] Client message received: %s", msg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func unwrap(msg []string) (head string, tail []string) {
|
|
head = msg[0]
|
|
if len(msg) > 1 && msg[1] == "" {
|
|
tail = msg[2:]
|
|
} else {
|
|
tail = msg[1:]
|
|
}
|
|
return
|
|
}
|
|
|
|
func AnswerClient(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != "/" {
|
|
http.NotFound(w, r)
|
|
return
|
|
}
|
|
|
|
/*
|
|
* Common code for all requests can go here...
|
|
*/
|
|
switch r.Method {
|
|
/*
|
|
* Handle GET requests
|
|
*/
|
|
case http.MethodGet:
|
|
log.Println("[DEBUG] GET method received")
|
|
fmt.Fprintf(w, "%v\n", r)
|
|
|
|
/*
|
|
* Handle POST requests
|
|
*/
|
|
case http.MethodPost:
|
|
log.Println("[DEBUG] POST method received")
|
|
fmt.Fprintf(w, "%v\n", r)
|
|
|
|
/*
|
|
* Handle PUT requests
|
|
*/
|
|
case http.MethodOptions:
|
|
log.Println("[DEBUG] PUT method received")
|
|
w.Header().Set("Allow", "GET, POST, OPTIONS")
|
|
w.WriteHeader(http.StatusNoContent)
|
|
|
|
/*
|
|
* Handle everything else
|
|
*/
|
|
default:
|
|
w.Header().Set("Allow", "GET, POST, OPTIONS")
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
}
|
|
}
|