gagent/internal/router/router.go

185 lines
3.9 KiB
Go

package router
import (
fmt "fmt"
log "log"
http "net/http"
strconv "strconv"
sync "sync"
gcdb "github.com/dragonheim/gagent/internal/chaindb"
gstructs "github.com/dragonheim/gagent/internal/gstructs"
zmq "github.com/pebbe/zmq4"
)
var (
db gcdb.GagentDb
)
/*
* The 'router' processes routing requests from the agent. The router does
* not handle any of the agent activities beyond processing the agent's
* list of tags and passing the agent and it's storage to either a member
* or client node. Tags are used by the agent to give hints as to where
* it should be routed.
* Main is the entrypoint for the router
*/
func Main(wg *sync.WaitGroup, config gstructs.GagentConfig) {
log.Printf("[INFO] Starting router\n")
defer wg.Done()
http.HandleFunc("/hello", AnswerClient)
clientSock, _ := zmq.NewSocket(zmq.ROUTER)
defer clientSock.Close()
workerSock, _ := zmq.NewSocket(zmq.DEALER)
defer workerSock.Close()
chain := gcdb.NewGagentDb()
log.Println("[DEBUG] Loading chaindb ")
err := chain.LoadHCL(config.ChainDBPath)
if err != nil {
log.Printf("[ERROR] Error loading chaindb: %s", err)
}
workerListener := "tcp://" + config.ListenAddr + ":" + strconv.Itoa(config.WorkerPort)
_ = workerSock.Bind(workerListener)
workers := make([]string, 0)
poller1 := zmq.NewPoller()
poller1.Add(workerSock, zmq.POLLIN)
poller2 := zmq.NewPoller()
poller2.Add(workerSock, zmq.POLLIN)
wg.Add(1)
go createClientListener(wg, config)
LOOP:
for {
/*
* Poll frontend only if we have available workers
*/
var sockets []zmq.Polled
var err error
if len(workers) > 0 {
sockets, err = poller2.Poll(-1)
} else {
sockets, err = poller1.Poll(-1)
}
if err != nil {
/*
* Interrupt
*/
break
}
for _, socket := range sockets {
switch s := socket.Socket; s {
case workerSock:
/*
* Handle worker activity on backend
* Use worker identity for load-balancing
*/
msg, err := s.RecvMessage(0)
if err != nil {
/*
* Interrupt
*/
break LOOP
}
var identity string
identity, msg = unwrap(msg)
log.Printf("[DEBUG] Worker message received: %s", msg)
workers = append(workers, identity)
case clientSock:
wg.Add(1)
go createClientListener(wg, config)
}
}
}
}
/*
* Create listener for client requests
*/
func createClientListener(wg *sync.WaitGroup, config gstructs.GagentConfig) error {
defer wg.Done()
clientSock, err := zmq.NewSocket(zmq.ROUTER)
if err != nil {
log.Printf("[ERROR] Error creating client socket: %s", err)
return err
}
defer clientSock.Close()
clientListener := "tcp://" + config.ListenAddr + ":" + strconv.Itoa(config.ClientPort)
log.Printf("[DEBUG] Binding to: %s", clientListener)
err = clientSock.Bind(clientListener)
if err != nil {
log.Printf("[ERROR] Error binding client socket: %s", err)
return err
}
for {
msg, err := clientSock.RecvMessage(0)
if err != nil {
break
}
log.Printf("[DEBUG] Client message received: %s", msg)
}
return nil
}
func unwrap(msg []string) (head string, tail []string) {
head = msg[0]
if len(msg) > 1 && msg[1] == "" {
tail = msg[2:]
} else {
tail = msg[1:]
}
return
}
func AnswerClient(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
/*
* Common code for all requests can go here...
*/
switch r.Method {
/*
* Handle GET requests
*/
case http.MethodGet:
log.Println("[DEBUG] GET method received")
fmt.Fprintf(w, "%v\n", r)
/*
* Handle POST requests
*/
case http.MethodPost:
log.Println("[DEBUG] POST method received")
fmt.Fprintf(w, "%v\n", r)
/*
* Handle PUT requests
*/
case http.MethodOptions:
log.Println("[DEBUG] PUT method received")
w.Header().Set("Allow", "GET, POST, OPTIONS")
w.WriteHeader(http.StatusNoContent)
/*
* Handle everything else
*/
default:
w.Header().Set("Allow", "GET, POST, OPTIONS")
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}