Added auto-restart and started playing with init() function

This commit is contained in:
James Wells 2021-10-25 12:43:10 -07:00
parent f47b6846db
commit 0d0695d195
Signed by: jwells
GPG key ID: 73196D10B8E65666
6 changed files with 134 additions and 86 deletions

View file

@ -1,13 +1,14 @@
package main
import (
"fmt"
fmt "fmt"
ioutil "io/ioutil"
log "log"
http "net/http"
os "os"
sync "sync"
time "time"
autorestart "github.com/slayer/autorestart"
gs "git.dragonheim.net/dragonheim/gagent/internal/gstructs"
@ -51,7 +52,12 @@ var exitCodes = struct {
"AGENT_NOT_DEFINED": 8,
}}
var config gs.GagentConfig
func main() {
autorestart.StartWatcher()
http.Handle("/metrics", promhttp.Handler())
filter := &logutils.LevelFilter{
Levels: []logutils.LogLevel{"DEBUG", "INFO", "WARN", "ERROR"},
MinLevel: logutils.LogLevel("DEBUG"),
@ -59,48 +65,46 @@ func main() {
}
log.SetOutput(filter)
http.Handle("/metrics", promhttp.Handler())
var agent gs.Agent
// config.File = "/etc/gagent/gagent.hcl"
var config gs.GagentConfig
config.File = "/etc/gagent/gagent.hcl"
config.Name, _ = os.Hostname()
config.Mode = "setup"
// config.Name, _ = os.Hostname()
// config.Mode = "setup"
/*
* Set a default UUID for this node.
* This is used throughout the G'Agent system to uniquely identify this node.
* It can be overridden in the configuration file by setting uuid
*/
identity := uuid.NewV4UUID()
config.UUID = identity.String()
// identity := uuid.NewV4UUID()
// config.UUID = identity.String()
/*
* By default, we want to listen on all IP addresses. It can be overridden
* in the configuration file by setting listenaddr
*/
config.ListenAddr = "0.0.0.0"
// config.ListenAddr = "0.0.0.0"
/*
* By default, G'Agent client will use port 35571 to communicate with the
* routers, but you can override it by setting the clientport in the
* configuration file
*/
config.ClientPort = 35571
// config.ClientPort = 35571
/*
* By default, G'Agent router will use port 35572 to communicate with
* other routers, but you can override it by setting the routerport in
* the configuration file
*/
config.RouterPort = 35570
// config.RouterPort = 35570
/*
* By default, G'Agent worker will use port 35570 to communicate with the
* routers, but you can override it by setting the workerport in the
* configuration file
*/
config.WorkerPort = 35572
// config.WorkerPort = 35572
/*
* Create a usage variable and then use that to declare the arguments and
@ -115,7 +119,7 @@ func main() {
usage += "\n"
usage += "Usage: \n"
usage += " gagent client [--config=<config>] [--agent=<file>] \n"
usage += " gagent client (pull|push) [--config=<config>] [--agent=<file>] \n"
usage += " gagent router [--config=<config>] \n"
usage += " gagent worker [--config=<config>] \n"
usage += " gagent setup [--config=<config>] \n"
@ -123,7 +127,8 @@ func main() {
usage += "\n"
usage += "Arguments: \n"
usage += " client -- Start as a G'Agent client \n"
usage += " client pull -- Start as a G'Agent client to pull agent results \n"
usage += " client push -- Start as a G'Agent client to push agent \n"
usage += " router -- Start as a G'Agent router \n"
usage += " worker -- Start as a G'Agent worker \n"
usage += " setup -- Write initial configuration file \n"
@ -133,7 +138,7 @@ func main() {
usage += " -h --help -- Show this help screen and exit \n"
usage += " --version -- Show version and exit \n"
usage += " --config=<config> -- [default: /etc/gagent/gagent.hcl] \n"
usage += " --agent=<file> -- filename of the agent to be uploaded to the G'Agent network \n"
usage += " --agent=<file> -- filename of the agent to be uploaded to the G'Agent network. Required in push mode \n"
usage += "\n"
/*
@ -143,6 +148,7 @@ func main() {
opts, _ := docopt.ParseArgs(usage, nil, semVER)
log.Printf("[DEBUG] Arguments are %v\n", opts)
log.Printf("[DEBUG] Config is %v\n", config)
if opts["--config"] != nil {
config.File = opts["--config"].(string)
}
@ -150,9 +156,7 @@ func main() {
/*
* Start Prometheus metrics exporter
*/
go func() {
http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ClientPort), nil)
}()
go http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ClientPort), nil)
/*
* Let the command line mode override the configuration.
@ -162,12 +166,17 @@ func main() {
} else {
err := hclsimple.DecodeFile(config.File, nil, &config)
if err != nil {
log.Printf("[ERROR] Failed to load configuration file: %s.\n", config.File)
log.Printf("[ERROR] %s\n", err)
log.Printf("[ERROR] Failed to load configuration file: %s.\n%s\n", config.File, err)
os.Exit(exitCodes.m["CONFIG_FILE_MISSING"])
}
if opts["client"] == true {
config.Mode = "client"
if opts["pull"] == true {
config.CMode = false
}
if opts["push"] == true {
config.CMode = true
}
}
if opts["router"] == true {
config.Mode = "router"
@ -192,17 +201,19 @@ func main() {
log.Printf("[ERROR] Agent file not specified")
os.Exit(exitCodes.m["AGENT_NOT_DEFINED"])
}
agent, err := ioutil.ReadFile(opts["--agent"].(string))
if err != nil {
log.Printf("[ERROR] Failed to load Agent file: %s", opts["--agent"])
os.Exit(exitCodes.m["AGENT_LOAD_FAILED"])
// var agent []byte
var err error
if config.CMode {
agent.ScriptCode, err = ioutil.ReadFile(opts["--agent"].(string))
if err != nil {
log.Printf("[ERROR] Failed to load Agent file: %s", opts["--agent"])
os.Exit(exitCodes.m["AGENT_LOAD_FAILED"])
}
}
for key := range config.Routers {
wg.Add(1)
go gc.Main(&wg, config, key, string(agent))
time.Sleep(10 * time.Second)
}
wg.Add(1)
go gc.Main(&wg, config, string(agent.ScriptCode))
case "router":
log.Printf("[INFO] Running in router mode\n")
@ -223,17 +234,15 @@ func main() {
os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"])
}
for key := range config.Routers {
wg.Add(1)
go gw.Main(&wg, config, key)
}
wg.Add(1)
go gw.Main(&wg, config)
case "setup":
log.Printf("[INFO] Running in setup mode\n")
f := hclwrite.NewEmptyFile()
rootBody := f.Body()
rootBody.SetAttributeValue("name", cty.StringVal(config.Name))
rootBody.SetAttributeValue("mode", cty.StringVal("client"))
rootBody.SetAttributeValue("mode", cty.StringVal(config.Mode))
rootBody.SetAttributeValue("uuid", cty.StringVal(config.UUID))
rootBody.AppendNewline()
@ -254,3 +263,30 @@ func main() {
wg.Wait()
os.Exit(exitCodes.m["SUCCESS"])
}
func init() {
// Initialize the configuration
config.Mode = "setup"
config.Name, _ = os.Hostname()
config.UUID = uuid.NewV4UUID().String()
config.ListenAddr = "0.0.0.0"
config.ClientPort = 35571
config.RouterPort = 35570
config.WorkerPort = 35572
config.Clients = make([]*gs.ClientDetails, 0)
config.Routers = make([]*gs.RouterDetails, 0)
config.Workers = make([]*gs.WorkerDetails, 0)
config.File = "/etc/gagent/gagent.hcl"
config.Version = semVER
// Initialize the exit codes
// exitCodes.m = make(map[string]int)
exitCodes.m["SUCCESS"] = 0
exitCodes.m["INVALID_MODE"] = 1
exitCodes.m["CONFIG_FILE_MISSING"] = 2
exitCodes.m["NO_ROUTERS_DEFINED"] = 3
exitCodes.m["NO_WORKERS_DEFINED"] = 4
exitCodes.m["AGENT_NOT_DEFINED"] = 5
exitCodes.m["AGENT_LOAD_FAILED"] = 6
}

1
go.mod
View file

@ -11,6 +11,7 @@ require (
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/pebbe/zmq4 v1.2.7
github.com/prometheus/client_golang v1.11.0
github.com/slayer/autorestart v0.0.0-20170706172704-7bc8d250279b
github.com/zclconf/go-cty v1.8.3
golang.org/x/text v0.3.6 // indirect
)

5
go.sum
View file

@ -91,6 +91,7 @@ github.com/pebbe/zmq4 v1.2.7/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
@ -115,11 +116,14 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/slayer/autorestart v0.0.0-20170706172704-7bc8d250279b h1:3EujQY7LEbzy5paxa0S2OrsL6+vTwYiUU/R272YlwiQ=
github.com/slayer/autorestart v0.0.0-20170706172704-7bc8d250279b/go.mod h1:p+QQKBy7tS+myk+y3sgnAKx4gUtD/Q9Z6KEd77cLzWY=
github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/vmihailenco/msgpack/v4 v4.3.12/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4=
@ -188,4 +192,5 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -3,7 +3,6 @@ package client
import (
fmt "fmt"
log "log"
http "net/http"
sync "sync"
time "time"
@ -19,48 +18,38 @@ import (
will contact the router and attempt to retrieve the results
of it's most recent request.
*/
func Main(wg *sync.WaitGroup, config gs.GagentConfig, rid int, agent string) {
func Main(wg *sync.WaitGroup, config gs.GagentConfig, agent string) {
defer wg.Done()
log.Printf("[INFO] Starting client\n")
// Generate connect string for this router.
var rport = config.ClientPort
if config.Routers[rid].ClientPort != 0 {
rport = config.Routers[rid].ClientPort
}
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport)
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
for key := range config.Routers {
// Generate connect string for this router.
rport := config.ClientPort
if config.Routers[key].ClientPort != 0 {
rport = config.Routers[key].ClientPort
}
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[key].RouterAddr, rport)
wg.Add(1)
go sendAgent(wg, config.UUID, connectString, agent)
time.Sleep(10 * time.Millisecond)
}
}
func sendAgent(wg *sync.WaitGroup, uuid string, connectString string, agent string) {
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
defer wg.Done()
var mu sync.Mutex
mu.Lock()
sock, _ := zmq.NewSocket(zmq.REQ)
defer sock.Close()
sock.SetIdentity(config.UUID)
sock.SetIdentity(uuid)
sock.Connect(connectString)
go func() {
mu.Lock()
log.Printf("[DEBUG] Start sending agent...\n")
sock.SendMessage(agent)
log.Printf("[DEBUG] End sending agent...\n")
mu.Unlock()
}()
time.Sleep(10 * time.Millisecond)
// for {
// time.Sleep(10 * time.Millisecond)
// mu.Lock()
// msg, err := sock.RecvMessage(zmq.DONTWAIT)
// if err == nil {
// log.Println(msg[0], config.UUID)
// }
// mu.Unlock()
// }
}
func pushAgent(config gs.GagentConfig) {
http.Get(config.Routers[0].RouterAddr)
log.Printf("[DEBUG] Start sending agent...\n")
sock.SendMessage(agent)
log.Printf("[DEBUG] End sending agent...\n")
mu.Unlock()
}

View file

@ -14,6 +14,13 @@ type GagentConfig struct {
Workers []*WorkerDetails `hcl:"worker,block"`
Version string
File string
CMode bool
}
type Agent struct {
Client string
ScriptCode []byte
Hints []*string
}
// ClientDetails is details about known clients

View file

@ -3,47 +3,57 @@ package worker
import (
fmt "fmt"
log "log"
http "net/http"
sync "sync"
gs "git.dragonheim.net/dragonheim/gagent/internal/gstructs"
// picol "git.dragonheim.net/dragonheim/gagent/src/picol"
promhttp "github.com/prometheus/client_golang/prometheus/promhttp"
prometheus "github.com/prometheus/client_golang/prometheus"
promauto "github.com/prometheus/client_golang/prometheus/promauto"
zmq "github.com/pebbe/zmq4"
)
var (
opsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "agent_requests_collected",
})
)
/*
The "worker" processes the agent code. The worker nodes do not know
anything about the network structure. Instead they know only to which
router(s) they are connected. The worker will execute the agent code and
pass the agent and it's results to a router.
*/
func Main(wg *sync.WaitGroup, config gs.GagentConfig, rid int) {
func Main(wg *sync.WaitGroup, config gs.GagentConfig) {
defer wg.Done()
http.Handle("/metrics", promhttp.Handler())
log.Printf("[INFO] Starting worker\n")
// Generate connect string for this router.
var rport = config.WorkerPort
if config.Routers[rid].WorkerPort != 0 {
rport = config.Routers[rid].WorkerPort
for key := range config.Routers {
rport := config.WorkerPort
if config.Routers[key].WorkerPort != 0 {
rport = config.Routers[key].WorkerPort
}
// Generate connect string for this router.
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[key].RouterAddr, rport)
wg.Add(1)
go getAgent(wg, config.UUID, connectString)
}
connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport)
// workerListener := fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.WorkerPort)
clientListener := fmt.Sprintf("%s:%d", config.ListenAddr, config.ClientPort)
}
func getAgent(wg *sync.WaitGroup, uuid string, connectString string) {
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
defer wg.Done()
subscriber, _ := zmq.NewSocket(zmq.REP)
defer subscriber.Close()
go func() {
http.ListenAndServe(clientListener, nil)
}()
log.Printf("[DEBUG] Attempting to connect to %s\n", connectString)
subscriber.Connect(connectString)
msg, err := subscriber.Recv(0)