diff --git a/cmd/gagent/main.go b/cmd/gagent/main.go index e2972bc..9a6fb05 100644 --- a/cmd/gagent/main.go +++ b/cmd/gagent/main.go @@ -1,6 +1,7 @@ package main import ( + sha "crypto/sha256" fmt "fmt" ioutil "io/ioutil" log "log" @@ -8,6 +9,8 @@ import ( os "os" sync "sync" + fqdn "github.com/Showmax/go-fqdn" + autorestart "github.com/slayer/autorestart" gs "git.dragonheim.net/dragonheim/gagent/internal/gstructs" @@ -53,11 +56,9 @@ var exitCodes = struct { }} var config gs.GagentConfig +var agent gs.AgentDetails func main() { - autorestart.StartWatcher() - http.Handle("/metrics", promhttp.Handler()) - filter := &logutils.LevelFilter{ Levels: []logutils.LogLevel{"DEBUG", "INFO", "WARN", "ERROR"}, MinLevel: logutils.LogLevel("DEBUG"), @@ -65,46 +66,153 @@ func main() { } log.SetOutput(filter) - var agent gs.Agent - // config.File = "/etc/gagent/gagent.hcl" + log.Printf("[DEBUG] Configuration is %v\n", config) - // config.Name, _ = os.Hostname() - // config.Mode = "setup" + switch config.Mode { + case "client": + log.Printf("[INFO] Running in client mode\n") + + if len(config.Routers) == 0 { + log.Printf("[ERROR] No routers defined.\n") + os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"]) + } + + var err error + if config.CMode { + agent.ScriptCode, err = ioutil.ReadFile(config.File) + if err != nil { + log.Printf("[ERROR] No such file or directory: %s", config.File) + os.Exit(exitCodes.m["AGENT_LOAD_FAILED"]) + } + agent.Shasum = fmt.Sprintf("%x", sha.Sum256(agent.ScriptCode)) + agent.Status = "loaded" + log.Printf("[DEBUG] SHA256 of Agent file: %s", agent.Shasum) + } + + wg.Add(1) + go gc.Main(&wg, config, string(agent.ScriptCode)) + + case "router": + log.Printf("[INFO] Running in router mode\n") + + if len(config.Workers) == 0 { + log.Printf("[ERROR] No workers defined.\n") + os.Exit(exitCodes.m["NO_WORKERS_DEFINED"]) + } + + wg.Add(1) + go gr.Main(&wg, config) + + case "worker": + log.Printf("[INFO] Running in worker mode\n") + + if len(config.Routers) == 0 { + log.Printf("[ERROR] No routers defined.\n") + os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"]) + } + + wg.Add(1) + go gw.Main(&wg, config) + + case "setup": + log.Printf("[INFO] Running in setup mode\n") + f := hclwrite.NewEmptyFile() + rootBody := f.Body() + rootBody.SetAttributeValue("name", cty.StringVal(config.Name)) + rootBody.SetAttributeValue("mode", cty.StringVal(config.Mode)) + rootBody.SetAttributeValue("uuid", cty.StringVal(config.UUID)) + rootBody.AppendNewline() + + routerBlock1 := rootBody.AppendNewBlock("router", []string{config.Name}) + routerBody1 := routerBlock1.Body() + routerBody1.SetAttributeValue("routerid", cty.StringVal(config.UUID)) + routerBody1.SetAttributeValue("address", cty.StringVal("127.0.0.1")) + routerBody1.SetAttributeValue("clientport", cty.NumberIntVal(config.ClientPort)) + rootBody.AppendNewline() + + log.Printf("\n%s", f.Bytes()) + os.Exit(exitCodes.m["SUCCESS"]) + + default: + log.Printf("[ERROR] Unknown operating mode, exiting.\n") + os.Exit(exitCodes.m["INVALID_MODE"]) + } + + wg.Wait() + os.Exit(exitCodes.m["SUCCESS"]) +} + +func init() { + var err error + + autorestart.StartWatcher() + + http.Handle("/metrics", promhttp.Handler()) + + /* + * Start Prometheus metrics exporter + */ + go http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ClientPort), nil) + + /* + * Initialize the exit codes + */ + exitCodes.m["SUCCESS"] = 0 + exitCodes.m["INVALID_MODE"] = 1 + exitCodes.m["CONFIG_FILE_MISSING"] = 2 + exitCodes.m["NO_ROUTERS_DEFINED"] = 3 + exitCodes.m["NO_WORKERS_DEFINED"] = 4 + exitCodes.m["AGENT_NOT_DEFINED"] = 5 + exitCodes.m["AGENT_LOAD_FAILED"] = 6 + + /* + * Initialize the configuration + */ + config.Version = semVER + + config.File = "/etc/gagent/gagent.hcl" + + config.Mode = "setup" + + config.Name, _ = fqdn.FqdnHostname() /* * Set a default UUID for this node. * This is used throughout the G'Agent system to uniquely identify this node. * It can be overridden in the configuration file by setting uuid */ - // identity := uuid.NewV4UUID() - // config.UUID = identity.String() + config.UUID = uuid.NewV4UUID().String() /* * By default, we want to listen on all IP addresses. It can be overridden * in the configuration file by setting listenaddr */ - // config.ListenAddr = "0.0.0.0" + config.ListenAddr = "0.0.0.0" /* * By default, G'Agent client will use port 35571 to communicate with the * routers, but you can override it by setting the clientport in the * configuration file */ - // config.ClientPort = 35571 + config.ClientPort = 35571 /* * By default, G'Agent router will use port 35572 to communicate with * other routers, but you can override it by setting the routerport in * the configuration file */ - // config.RouterPort = 35570 + config.RouterPort = 35570 /* * By default, G'Agent worker will use port 35570 to communicate with the * routers, but you can override it by setting the workerport in the * configuration file */ - // config.WorkerPort = 35572 + config.WorkerPort = 35572 + + config.Clients = make([]*gs.ClientDetails, 0) + config.Routers = make([]*gs.RouterDetails, 0) + config.Workers = make([]*gs.WorkerDetails, 0) /* * Create a usage variable and then use that to declare the arguments and @@ -148,15 +256,15 @@ func main() { opts, _ := docopt.ParseArgs(usage, nil, semVER) log.Printf("[DEBUG] Arguments are %v\n", opts) - log.Printf("[DEBUG] Config is %v\n", config) if opts["--config"] != nil { config.File = opts["--config"].(string) } - /* - * Start Prometheus metrics exporter - */ - go http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ClientPort), nil) + err = hclsimple.DecodeFile(config.File, nil, &config) + if err != nil && opts["setup"] == false { + log.Printf("[ERROR] Failed to load configuration file: %s.\n", config.File) + os.Exit(exitCodes.m["CONFIG_FILE_MISSING"]) + } /* * Let the command line mode override the configuration. @@ -164,20 +272,24 @@ func main() { if opts["setup"] == true { config.Mode = "setup" } else { - err := hclsimple.DecodeFile(config.File, nil, &config) - if err != nil { - log.Printf("[ERROR] Failed to load configuration file: %s.\n%s\n", config.File, err) - os.Exit(exitCodes.m["CONFIG_FILE_MISSING"]) - } if opts["client"] == true { config.Mode = "client" + if opts["--agent"] == nil { + log.Printf("[ERROR] Agent file not specified") + os.Exit(exitCodes.m["AGENT_NOT_DEFINED"]) + } else { + config.File = opts["--agent"].(string) + } + if opts["pull"] == true { config.CMode = false } + if opts["push"] == true { config.CMode = true } } + if opts["router"] == true { config.Mode = "router" } @@ -185,108 +297,6 @@ func main() { config.Mode = "worker" } } - config.Version = semVER - log.Printf("[DEBUG] Configuration is %v\n", config) - - switch config.Mode { - case "client": - log.Printf("[INFO] Running in client mode\n") - - if len(config.Routers) == 0 { - log.Printf("[ERROR] No routers defined.\n") - os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"]) - } - - if opts["--agent"] == nil { - log.Printf("[ERROR] Agent file not specified") - os.Exit(exitCodes.m["AGENT_NOT_DEFINED"]) - } - - // var agent []byte - var err error - if config.CMode { - agent.ScriptCode, err = ioutil.ReadFile(opts["--agent"].(string)) - if err != nil { - log.Printf("[ERROR] Failed to load Agent file: %s", opts["--agent"]) - os.Exit(exitCodes.m["AGENT_LOAD_FAILED"]) - } - } - - wg.Add(1) - go gc.Main(&wg, config, string(agent.ScriptCode)) - - case "router": - log.Printf("[INFO] Running in router mode\n") - - if len(config.Workers) == 0 { - log.Printf("[ERROR] No workers defined.\n") - os.Exit(exitCodes.m["NO_WORKERS_DEFINED"]) - } - - wg.Add(1) - go gr.Main(&wg, config) - - case "worker": - log.Printf("[INFO] Running in worker mode\n") - - if len(config.Routers) == 0 { - log.Printf("[ERROR] No routers defined.\n") - os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"]) - } - - wg.Add(1) - go gw.Main(&wg, config) - - case "setup": - log.Printf("[INFO] Running in setup mode\n") - f := hclwrite.NewEmptyFile() - rootBody := f.Body() - rootBody.SetAttributeValue("name", cty.StringVal(config.Name)) - rootBody.SetAttributeValue("mode", cty.StringVal(config.Mode)) - rootBody.SetAttributeValue("uuid", cty.StringVal(config.UUID)) - rootBody.AppendNewline() - - routerBlock1 := rootBody.AppendNewBlock("router", []string{config.Name}) - routerBody1 := routerBlock1.Body() - routerBody1.SetAttributeValue("routerid", cty.StringVal(config.UUID)) - routerBody1.SetAttributeValue("address", cty.StringVal("127.0.0.1")) - rootBody.AppendNewline() - - log.Printf("\n%s", f.Bytes()) - os.Exit(exitCodes.m["SUCCESS"]) - - default: - log.Printf("[ERROR] Unknown operating mode, exiting.\n") - os.Exit(exitCodes.m["INVALID_MODE"]) - } - - wg.Wait() - os.Exit(exitCodes.m["SUCCESS"]) -} - -func init() { - // Initialize the configuration - config.Mode = "setup" - config.Name, _ = os.Hostname() - config.UUID = uuid.NewV4UUID().String() - config.ListenAddr = "0.0.0.0" - config.ClientPort = 35571 - config.RouterPort = 35570 - config.WorkerPort = 35572 - config.Clients = make([]*gs.ClientDetails, 0) - config.Routers = make([]*gs.RouterDetails, 0) - config.Workers = make([]*gs.WorkerDetails, 0) - config.File = "/etc/gagent/gagent.hcl" - config.Version = semVER - - // Initialize the exit codes - // exitCodes.m = make(map[string]int) - exitCodes.m["SUCCESS"] = 0 - exitCodes.m["INVALID_MODE"] = 1 - exitCodes.m["CONFIG_FILE_MISSING"] = 2 - exitCodes.m["NO_ROUTERS_DEFINED"] = 3 - exitCodes.m["NO_WORKERS_DEFINED"] = 4 - exitCodes.m["AGENT_NOT_DEFINED"] = 5 - exitCodes.m["AGENT_LOAD_FAILED"] = 6 + log.Printf("[DEBUG] Config is %v\n", config) } diff --git a/go.mod b/go.mod index 2c2b5c6..0deade1 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.dragonheim.net/dragonheim/gagent go 1.16 require ( + github.com/Showmax/go-fqdn v1.0.0 github.com/agext/levenshtein v1.2.3 // indirect github.com/aviddiviner/docopt-go v0.0.0-20170807220726-d8a1d67efc6a github.com/hashicorp/hcl/v2 v2.10.1 diff --git a/go.sum b/go.sum index 08e819e..77f0996 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/Showmax/go-fqdn v1.0.0 h1:0rG5IbmVliNT5O19Mfuvna9LL7zlHyRfsSvBPZmF9tM= +github.com/Showmax/go-fqdn v1.0.0/go.mod h1:SfrFBzmDCtCGrnHhoDjuvFnKsWjEQX/Q9ARZvOrJAko= github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo= github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= diff --git a/internal/client/client.go b/internal/client/client.go index 2cfd256..34c416b 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -23,7 +23,9 @@ func Main(wg *sync.WaitGroup, config gs.GagentConfig, agent string) { log.Printf("[INFO] Starting client\n") for key := range config.Routers { - // Generate connect string for this router. + /* + * Generate connect string for this router. + */ rport := config.ClientPort if config.Routers[key].ClientPort != 0 { rport = config.Routers[key].ClientPort diff --git a/internal/gstructs/gstructs.go b/internal/gstructs/gstructs.go index 9887a5b..50c2427 100644 --- a/internal/gstructs/gstructs.go +++ b/internal/gstructs/gstructs.go @@ -17,13 +17,9 @@ type GagentConfig struct { CMode bool } -type Agent struct { - Client string - ScriptCode []byte - Hints []*string -} - -// ClientDetails is details about known clients +/* + * ClientDetails are details about known clients + */ type ClientDetails struct { /* * Client name for display purposes in logs and @@ -40,7 +36,9 @@ type ClientDetails struct { ClientID string `hcl:"clientid,optional"` } -// RouterDetails is details about known routers +/* + * RouterDetails is details about known routers + */ type RouterDetails struct { /* * Router name for display purposes in logs and @@ -93,7 +91,9 @@ type RouterDetails struct { RouterTags []string `hcl:"tags,optional"` } -// WorkerDetails is details about known workers +/* + * WorkerDetails is details about known workers + */ type WorkerDetails struct { /* * Router name for display purposes in logs and @@ -117,3 +117,15 @@ type WorkerDetails struct { */ WorkerTags []string `hcl:"tags,optional"` } + +type BlockChainDB struct { + DBName string `hcl:"chain_id,optional"` + Agents []*AgentDetails `hcl:"agent,block"` +} +type AgentDetails struct { + ScriptCode []byte + Hints []*string + Client string `hcl:"client"` + Shasum string `hcl:"shasum"` + Status string `hcl:"status"` +} diff --git a/internal/router/router.go b/internal/router/router.go index 6ac0f7e..ae325e2 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -52,7 +52,9 @@ func Main(wg *sync.WaitGroup, config gs.GagentConfig) { LOOP: for { - // Poll frontend only if we have available workers + /* + * Poll frontend only if we have available workers + */ var sockets []zmq.Polled var err error if len(workers) > 0 { @@ -61,16 +63,23 @@ LOOP: sockets, err = poller1.Poll(-1) } if err != nil { - break // Interrupted + /* + * Interrupt + */ + break } for _, socket := range sockets { switch s := socket.Socket; s { case workerSock: - // Handle worker activity on backend - // Use worker identity for load-balancing + /* + * Handle worker activity on backend + * Use worker identity for load-balancing + */ msg, err := s.RecvMessage(0) if err != nil { - // Interrupted + /* + * Interrupt + */ break LOOP } var identity string @@ -79,7 +88,9 @@ LOOP: workers = append(workers, identity) case clientSock: - // Get client request, route to first available worker + /* + * Get client request, route to first available worker + */ msg, err := s.RecvMessage(0) log.Printf("[DEBUG] Client message received: %s", msg) if err == nil { @@ -109,20 +120,32 @@ func answerClient(w http.ResponseWriter, r *http.Request) { return } - // Common code for all requests can go here... - + /* + * Common code for all requests can go here... + */ switch r.Method { + /* + * Handle GET requests + */ case http.MethodGet: fmt.Fprintf(w, "%v\n", r) - // Handle the GET request... + /* + * Handle POST requests + */ case http.MethodPost: - // Handle the POST request... + fmt.Fprintf(w, "%v\n", r) + /* + * Handle PUT requests + */ case http.MethodOptions: w.Header().Set("Allow", "GET, POST, OPTIONS") w.WriteHeader(http.StatusNoContent) + /* + * Handle everything else + */ default: w.Header().Set("Allow", "GET, POST, OPTIONS") http.Error(w, "method not allowed", http.StatusMethodNotAllowed) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 793fafd..9bc50d2 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -37,7 +37,9 @@ func Main(wg *sync.WaitGroup, config gs.GagentConfig) { rport = config.Routers[key].WorkerPort } - // Generate connect string for this router. + /* + * Generate connect string for this router. + */ connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[key].RouterAddr, rport) wg.Add(1)