Adding support for a blockchain DB for agent history.

This commit is contained in:
James Wells 2021-10-26 13:21:44 -07:00
parent 0d0695d195
commit 5e69931ed7
Signed by: jwells
GPG key ID: 73196D10B8E65666
7 changed files with 199 additions and 147 deletions

View file

@ -1,6 +1,7 @@
package main
import (
sha "crypto/sha256"
fmt "fmt"
ioutil "io/ioutil"
log "log"
@ -8,6 +9,8 @@ import (
os "os"
sync "sync"
fqdn "github.com/Showmax/go-fqdn"
autorestart "github.com/slayer/autorestart"
gs "git.dragonheim.net/dragonheim/gagent/internal/gstructs"
@ -53,11 +56,9 @@ var exitCodes = struct {
}}
var config gs.GagentConfig
var agent gs.AgentDetails
func main() {
autorestart.StartWatcher()
http.Handle("/metrics", promhttp.Handler())
filter := &logutils.LevelFilter{
Levels: []logutils.LogLevel{"DEBUG", "INFO", "WARN", "ERROR"},
MinLevel: logutils.LogLevel("DEBUG"),
@ -65,46 +66,153 @@ func main() {
}
log.SetOutput(filter)
var agent gs.Agent
// config.File = "/etc/gagent/gagent.hcl"
log.Printf("[DEBUG] Configuration is %v\n", config)
// config.Name, _ = os.Hostname()
// config.Mode = "setup"
switch config.Mode {
case "client":
log.Printf("[INFO] Running in client mode\n")
if len(config.Routers) == 0 {
log.Printf("[ERROR] No routers defined.\n")
os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"])
}
var err error
if config.CMode {
agent.ScriptCode, err = ioutil.ReadFile(config.File)
if err != nil {
log.Printf("[ERROR] No such file or directory: %s", config.File)
os.Exit(exitCodes.m["AGENT_LOAD_FAILED"])
}
agent.Shasum = fmt.Sprintf("%x", sha.Sum256(agent.ScriptCode))
agent.Status = "loaded"
log.Printf("[DEBUG] SHA256 of Agent file: %s", agent.Shasum)
}
wg.Add(1)
go gc.Main(&wg, config, string(agent.ScriptCode))
case "router":
log.Printf("[INFO] Running in router mode\n")
if len(config.Workers) == 0 {
log.Printf("[ERROR] No workers defined.\n")
os.Exit(exitCodes.m["NO_WORKERS_DEFINED"])
}
wg.Add(1)
go gr.Main(&wg, config)
case "worker":
log.Printf("[INFO] Running in worker mode\n")
if len(config.Routers) == 0 {
log.Printf("[ERROR] No routers defined.\n")
os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"])
}
wg.Add(1)
go gw.Main(&wg, config)
case "setup":
log.Printf("[INFO] Running in setup mode\n")
f := hclwrite.NewEmptyFile()
rootBody := f.Body()
rootBody.SetAttributeValue("name", cty.StringVal(config.Name))
rootBody.SetAttributeValue("mode", cty.StringVal(config.Mode))
rootBody.SetAttributeValue("uuid", cty.StringVal(config.UUID))
rootBody.AppendNewline()
routerBlock1 := rootBody.AppendNewBlock("router", []string{config.Name})
routerBody1 := routerBlock1.Body()
routerBody1.SetAttributeValue("routerid", cty.StringVal(config.UUID))
routerBody1.SetAttributeValue("address", cty.StringVal("127.0.0.1"))
routerBody1.SetAttributeValue("clientport", cty.NumberIntVal(config.ClientPort))
rootBody.AppendNewline()
log.Printf("\n%s", f.Bytes())
os.Exit(exitCodes.m["SUCCESS"])
default:
log.Printf("[ERROR] Unknown operating mode, exiting.\n")
os.Exit(exitCodes.m["INVALID_MODE"])
}
wg.Wait()
os.Exit(exitCodes.m["SUCCESS"])
}
func init() {
var err error
autorestart.StartWatcher()
http.Handle("/metrics", promhttp.Handler())
/*
* Start Prometheus metrics exporter
*/
go http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ClientPort), nil)
/*
* Initialize the exit codes
*/
exitCodes.m["SUCCESS"] = 0
exitCodes.m["INVALID_MODE"] = 1
exitCodes.m["CONFIG_FILE_MISSING"] = 2
exitCodes.m["NO_ROUTERS_DEFINED"] = 3
exitCodes.m["NO_WORKERS_DEFINED"] = 4
exitCodes.m["AGENT_NOT_DEFINED"] = 5
exitCodes.m["AGENT_LOAD_FAILED"] = 6
/*
* Initialize the configuration
*/
config.Version = semVER
config.File = "/etc/gagent/gagent.hcl"
config.Mode = "setup"
config.Name, _ = fqdn.FqdnHostname()
/*
* Set a default UUID for this node.
* This is used throughout the G'Agent system to uniquely identify this node.
* It can be overridden in the configuration file by setting uuid
*/
// identity := uuid.NewV4UUID()
// config.UUID = identity.String()
config.UUID = uuid.NewV4UUID().String()
/*
* By default, we want to listen on all IP addresses. It can be overridden
* in the configuration file by setting listenaddr
*/
// config.ListenAddr = "0.0.0.0"
config.ListenAddr = "0.0.0.0"
/*
* By default, G'Agent client will use port 35571 to communicate with the
* routers, but you can override it by setting the clientport in the
* configuration file
*/
// config.ClientPort = 35571
config.ClientPort = 35571
/*
* By default, G'Agent router will use port 35572 to communicate with
* other routers, but you can override it by setting the routerport in
* the configuration file
*/
// config.RouterPort = 35570
config.RouterPort = 35570
/*
* By default, G'Agent worker will use port 35570 to communicate with the
* routers, but you can override it by setting the workerport in the
* configuration file
*/
// config.WorkerPort = 35572
config.WorkerPort = 35572
config.Clients = make([]*gs.ClientDetails, 0)
config.Routers = make([]*gs.RouterDetails, 0)
config.Workers = make([]*gs.WorkerDetails, 0)
/*
* Create a usage variable and then use that to declare the arguments and
@ -148,15 +256,15 @@ func main() {
opts, _ := docopt.ParseArgs(usage, nil, semVER)
log.Printf("[DEBUG] Arguments are %v\n", opts)
log.Printf("[DEBUG] Config is %v\n", config)
if opts["--config"] != nil {
config.File = opts["--config"].(string)
}
/*
* Start Prometheus metrics exporter
*/
go http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ClientPort), nil)
err = hclsimple.DecodeFile(config.File, nil, &config)
if err != nil && opts["setup"] == false {
log.Printf("[ERROR] Failed to load configuration file: %s.\n", config.File)
os.Exit(exitCodes.m["CONFIG_FILE_MISSING"])
}
/*
* Let the command line mode override the configuration.
@ -164,20 +272,24 @@ func main() {
if opts["setup"] == true {
config.Mode = "setup"
} else {
err := hclsimple.DecodeFile(config.File, nil, &config)
if err != nil {
log.Printf("[ERROR] Failed to load configuration file: %s.\n%s\n", config.File, err)
os.Exit(exitCodes.m["CONFIG_FILE_MISSING"])
}
if opts["client"] == true {
config.Mode = "client"
if opts["--agent"] == nil {
log.Printf("[ERROR] Agent file not specified")
os.Exit(exitCodes.m["AGENT_NOT_DEFINED"])
} else {
config.File = opts["--agent"].(string)
}
if opts["pull"] == true {
config.CMode = false
}
if opts["push"] == true {
config.CMode = true
}
}
if opts["router"] == true {
config.Mode = "router"
}
@ -185,108 +297,6 @@ func main() {
config.Mode = "worker"
}
}
config.Version = semVER
log.Printf("[DEBUG] Configuration is %v\n", config)
switch config.Mode {
case "client":
log.Printf("[INFO] Running in client mode\n")
if len(config.Routers) == 0 {
log.Printf("[ERROR] No routers defined.\n")
os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"])
}
if opts["--agent"] == nil {
log.Printf("[ERROR] Agent file not specified")
os.Exit(exitCodes.m["AGENT_NOT_DEFINED"])
}
// var agent []byte
var err error
if config.CMode {
agent.ScriptCode, err = ioutil.ReadFile(opts["--agent"].(string))
if err != nil {
log.Printf("[ERROR] Failed to load Agent file: %s", opts["--agent"])
os.Exit(exitCodes.m["AGENT_LOAD_FAILED"])
}
}
wg.Add(1)
go gc.Main(&wg, config, string(agent.ScriptCode))
case "router":
log.Printf("[INFO] Running in router mode\n")
if len(config.Workers) == 0 {
log.Printf("[ERROR] No workers defined.\n")
os.Exit(exitCodes.m["NO_WORKERS_DEFINED"])
}
wg.Add(1)
go gr.Main(&wg, config)
case "worker":
log.Printf("[INFO] Running in worker mode\n")
if len(config.Routers) == 0 {
log.Printf("[ERROR] No routers defined.\n")
os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"])
}
wg.Add(1)
go gw.Main(&wg, config)
case "setup":
log.Printf("[INFO] Running in setup mode\n")
f := hclwrite.NewEmptyFile()
rootBody := f.Body()
rootBody.SetAttributeValue("name", cty.StringVal(config.Name))
rootBody.SetAttributeValue("mode", cty.StringVal(config.Mode))
rootBody.SetAttributeValue("uuid", cty.StringVal(config.UUID))
rootBody.AppendNewline()
routerBlock1 := rootBody.AppendNewBlock("router", []string{config.Name})
routerBody1 := routerBlock1.Body()
routerBody1.SetAttributeValue("routerid", cty.StringVal(config.UUID))
routerBody1.SetAttributeValue("address", cty.StringVal("127.0.0.1"))
rootBody.AppendNewline()
log.Printf("\n%s", f.Bytes())
os.Exit(exitCodes.m["SUCCESS"])
default:
log.Printf("[ERROR] Unknown operating mode, exiting.\n")
os.Exit(exitCodes.m["INVALID_MODE"])
}
wg.Wait()
os.Exit(exitCodes.m["SUCCESS"])
}
func init() {
// Initialize the configuration
config.Mode = "setup"
config.Name, _ = os.Hostname()
config.UUID = uuid.NewV4UUID().String()
config.ListenAddr = "0.0.0.0"
config.ClientPort = 35571
config.RouterPort = 35570
config.WorkerPort = 35572
config.Clients = make([]*gs.ClientDetails, 0)
config.Routers = make([]*gs.RouterDetails, 0)
config.Workers = make([]*gs.WorkerDetails, 0)
config.File = "/etc/gagent/gagent.hcl"
config.Version = semVER
// Initialize the exit codes
// exitCodes.m = make(map[string]int)
exitCodes.m["SUCCESS"] = 0
exitCodes.m["INVALID_MODE"] = 1
exitCodes.m["CONFIG_FILE_MISSING"] = 2
exitCodes.m["NO_ROUTERS_DEFINED"] = 3
exitCodes.m["NO_WORKERS_DEFINED"] = 4
exitCodes.m["AGENT_NOT_DEFINED"] = 5
exitCodes.m["AGENT_LOAD_FAILED"] = 6
log.Printf("[DEBUG] Config is %v\n", config)
}

1
go.mod
View file

@ -3,6 +3,7 @@ module git.dragonheim.net/dragonheim/gagent
go 1.16
require (
github.com/Showmax/go-fqdn v1.0.0
github.com/agext/levenshtein v1.2.3 // indirect
github.com/aviddiviner/docopt-go v0.0.0-20170807220726-d8a1d67efc6a
github.com/hashicorp/hcl/v2 v2.10.1

2
go.sum
View file

@ -1,4 +1,6 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/Showmax/go-fqdn v1.0.0 h1:0rG5IbmVliNT5O19Mfuvna9LL7zlHyRfsSvBPZmF9tM=
github.com/Showmax/go-fqdn v1.0.0/go.mod h1:SfrFBzmDCtCGrnHhoDjuvFnKsWjEQX/Q9ARZvOrJAko=
github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo=
github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=

View file

@ -23,7 +23,9 @@ func Main(wg *sync.WaitGroup, config gs.GagentConfig, agent string) {
log.Printf("[INFO] Starting client\n")
for key := range config.Routers {
// Generate connect string for this router.
/*
* Generate connect string for this router.
*/
rport := config.ClientPort
if config.Routers[key].ClientPort != 0 {
rport = config.Routers[key].ClientPort

View file

@ -17,13 +17,9 @@ type GagentConfig struct {
CMode bool
}
type Agent struct {
Client string
ScriptCode []byte
Hints []*string
}
// ClientDetails is details about known clients
/*
* ClientDetails are details about known clients
*/
type ClientDetails struct {
/*
* Client name for display purposes in logs and
@ -40,7 +36,9 @@ type ClientDetails struct {
ClientID string `hcl:"clientid,optional"`
}
// RouterDetails is details about known routers
/*
* RouterDetails is details about known routers
*/
type RouterDetails struct {
/*
* Router name for display purposes in logs and
@ -93,7 +91,9 @@ type RouterDetails struct {
RouterTags []string `hcl:"tags,optional"`
}
// WorkerDetails is details about known workers
/*
* WorkerDetails is details about known workers
*/
type WorkerDetails struct {
/*
* Router name for display purposes in logs and
@ -117,3 +117,15 @@ type WorkerDetails struct {
*/
WorkerTags []string `hcl:"tags,optional"`
}
type BlockChainDB struct {
DBName string `hcl:"chain_id,optional"`
Agents []*AgentDetails `hcl:"agent,block"`
}
type AgentDetails struct {
ScriptCode []byte
Hints []*string
Client string `hcl:"client"`
Shasum string `hcl:"shasum"`
Status string `hcl:"status"`
}

View file

@ -52,7 +52,9 @@ func Main(wg *sync.WaitGroup, config gs.GagentConfig) {
LOOP:
for {
// Poll frontend only if we have available workers
/*
* Poll frontend only if we have available workers
*/
var sockets []zmq.Polled
var err error
if len(workers) > 0 {
@ -61,16 +63,23 @@ LOOP:
sockets, err = poller1.Poll(-1)
}
if err != nil {
break // Interrupted
/*
* Interrupt
*/
break
}
for _, socket := range sockets {
switch s := socket.Socket; s {
case workerSock:
// Handle worker activity on backend
// Use worker identity for load-balancing
/*
* Handle worker activity on backend
* Use worker identity for load-balancing
*/
msg, err := s.RecvMessage(0)
if err != nil {
// Interrupted
/*
* Interrupt
*/
break LOOP
}
var identity string
@ -79,7 +88,9 @@ LOOP:
workers = append(workers, identity)
case clientSock:
// Get client request, route to first available worker
/*
* Get client request, route to first available worker
*/
msg, err := s.RecvMessage(0)
log.Printf("[DEBUG] Client message received: %s", msg)
if err == nil {
@ -109,20 +120,32 @@ func answerClient(w http.ResponseWriter, r *http.Request) {
return
}
// Common code for all requests can go here...
/*
* Common code for all requests can go here...
*/
switch r.Method {
/*
* Handle GET requests
*/
case http.MethodGet:
fmt.Fprintf(w, "%v\n", r)
// Handle the GET request...
/*
* Handle POST requests
*/
case http.MethodPost:
// Handle the POST request...
fmt.Fprintf(w, "%v\n", r)
/*
* Handle PUT requests
*/
case http.MethodOptions:
w.Header().Set("Allow", "GET, POST, OPTIONS")
w.WriteHeader(http.StatusNoContent)
/*
* Handle everything else
*/
default:
w.Header().Set("Allow", "GET, POST, OPTIONS")
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)

View file

@ -37,7 +37,9 @@ func Main(wg *sync.WaitGroup, config gs.GagentConfig) {
rport = config.Routers[key].WorkerPort
}
// Generate connect string for this router.
/*
* Generate connect string for this router.
*/
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[key].RouterAddr, rport)
wg.Add(1)