gagent/internal/router/router.go

181 lines
3.7 KiB
Go
Raw Normal View History

package router
import (
fmt "fmt"
log "log"
http "net/http"
strconv "strconv"
sync "sync"
2023-01-11 18:49:48 -08:00
gcdb "github.com/dragonheim/gagent/internal/chaindb"
gstructs "github.com/dragonheim/gagent/internal/gstructs"
2021-10-18 05:46:52 -07:00
prometheus "github.com/prometheus/client_golang/prometheus"
promauto "github.com/prometheus/client_golang/prometheus/promauto"
zmq "github.com/pebbe/zmq4"
)
2021-10-18 05:46:52 -07:00
var (
opsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "client_requests_received",
2021-10-18 05:46:52 -07:00
})
db gcdb.GagentDb
2021-10-18 05:46:52 -07:00
)
/*
* The 'router' processes routing requests from the agent. The router does
* not handle any of the agent activities beyond processing the agent's
* list of tags and passing the agent and it's storage to either a member
* or client node. Tags are used by the agent to give hints as to where
* it should be routed.
* Main is the entrypoint for the router
*/
func Main(wg *sync.WaitGroup, config gstructs.GagentConfig) {
log.Printf("[INFO] Starting router\n")
defer wg.Done()
2021-02-26 08:14:28 -08:00
http.HandleFunc("/hello", answerClient)
clientSock, _ := zmq.NewSocket(zmq.ROUTER)
defer clientSock.Close()
2021-02-26 08:14:28 -08:00
workerSock, _ := zmq.NewSocket(zmq.DEALER)
defer workerSock.Close()
db.Init()
workerListener := "tcp://" + config.ListenAddr + ":" + strconv.Itoa(config.WorkerPort)
_ = workerSock.Bind(workerListener)
2021-02-26 08:14:28 -08:00
workers := make([]string, 0)
2021-02-26 08:14:28 -08:00
poller1 := zmq.NewPoller()
poller1.Add(workerSock, zmq.POLLIN)
poller2 := zmq.NewPoller()
poller2.Add(workerSock, zmq.POLLIN)
wg.Add(1)
go createClientListener(wg, config)
LOOP:
for {
/*
* Poll frontend only if we have available workers
*/
var sockets []zmq.Polled
var err error
if len(workers) > 0 {
sockets, err = poller2.Poll(-1)
} else {
sockets, err = poller1.Poll(-1)
}
if err != nil {
/*
* Interrupt
*/
break
}
for _, socket := range sockets {
switch s := socket.Socket; s {
case workerSock:
/*
* Handle worker activity on backend
* Use worker identity for load-balancing
*/
msg, err := s.RecvMessage(0)
if err != nil {
/*
* Interrupt
*/
break LOOP
}
var identity string
identity, msg = unwrap(msg)
log.Printf("[DEBUG] Worker message received: %s", msg)
workers = append(workers, identity)
case clientSock:
wg.Add(1)
go createClientListener(wg, config)
}
}
}
}
/*
* Create listener for client requests
*/
func createClientListener(wg *sync.WaitGroup, config gstructs.GagentConfig) {
defer wg.Done()
clientSock, _ := zmq.NewSocket(zmq.ROUTER)
defer clientSock.Close()
clientListener := "tcp://" + config.ListenAddr + ":" + strconv.Itoa(config.ClientPort)
log.Printf("[DEBUG] Binding to: %s", clientListener)
_ = clientSock.Bind(clientListener)
for {
msg, err := clientSock.RecvMessage(0)
if err != nil {
break
}
log.Printf("[DEBUG] Client message received: %s", msg)
}
}
func unwrap(msg []string) (head string, tail []string) {
head = msg[0]
if len(msg) > 1 && msg[1] == "" {
tail = msg[2:]
} else {
tail = msg[1:]
}
return
}
func answerClient(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
2021-10-18 12:55:28 -07:00
opsProcessed.Inc()
/*
* fmt.Fprintf(w, "%v\n", r)
*/
2021-10-18 05:46:52 -07:00
http.NotFound(w, r)
return
}
/*
* Common code for all requests can go here...
*/
switch r.Method {
/*
* Handle GET requests
*/
case http.MethodGet:
fmt.Fprintf(w, "%v\n", r)
/*
* Handle POST requests
*/
case http.MethodPost:
fmt.Fprintf(w, "%v\n", r)
/*
* Handle PUT requests
*/
case http.MethodOptions:
w.Header().Set("Allow", "GET, POST, OPTIONS")
w.WriteHeader(http.StatusNoContent)
/*
* Handle everything else
*/
default:
w.Header().Set("Allow", "GET, POST, OPTIONS")
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}