From 7d6fbfef2491ac027eff196c0b816b6937b2195c Mon Sep 17 00:00:00 2001 From: James Wells Date: Fri, 21 May 2021 23:03:50 +0000 Subject: [PATCH] fix: (issues/9) Worker does not support port assignment for routers (#11) Quite a refactor and some code cleanup. Co-authored-by: James Wells Reviewed-on: https://git.dragonheim.net/dragonheim/gagent/pulls/11 Co-authored-by: James Wells Co-committed-by: James Wells --- .drone.yml | 6 + .gitignore | 4 +- LANGUAGE.md | 21 ++- README.md | 14 +- docker/Dockerfile | 18 ++- examples/{example_gagent.hcl => gagent.hcl} | 91 ++++++------- examples/hello-earth.tcl | 4 +- gagent/main.go | 139 ++++++++++++++------ go.mod | 3 + go.sum | 5 +- simple_build.sh | 4 + src/client/client.go | 48 +++---- src/gstructs/gstructs.go | 31 +++-- src/picol/LICENSE | 20 +++ src/picol/README.md | 27 ++++ src/picol/examples/fib.tcl | 9 ++ src/picol/examples/t2.tcl | 15 +++ src/picol/examples/tst.tcl | 5 + src/picol/picol/main.go | 46 +++++++ src/router/router.go | 130 +++++++++--------- src/worker/worker.go | 82 +++--------- 21 files changed, 451 insertions(+), 271 deletions(-) rename examples/{example_gagent.hcl => gagent.hcl} (55%) create mode 100644 simple_build.sh create mode 100644 src/picol/LICENSE create mode 100644 src/picol/README.md create mode 100644 src/picol/examples/fib.tcl create mode 100644 src/picol/examples/t2.tcl create mode 100644 src/picol/examples/tst.tcl create mode 100644 src/picol/picol/main.go diff --git a/.drone.yml b/.drone.yml index 509fc62..55c9362 100644 --- a/.drone.yml +++ b/.drone.yml @@ -25,6 +25,9 @@ steps: volumes: - name: dockersock path: /var/run/docker.sock + environment: + GOOS: linux + GOARCH: amd64 settings: auto_tag: false daemon_off: true @@ -82,6 +85,9 @@ steps: volumes: - name: dockersock path: /var/run/docker.sock + environment: + GOOS: linux + GOARCH: arm settings: auto_tag: false daemon_off: true diff --git a/.gitignore b/.gitignore index 8b25ba0..28732ad 100644 --- a/.gitignore +++ b/.gitignore @@ -40,8 +40,6 @@ debug *.log *.pid -# Ignore G'Agent configuration file. -gagent.hcl - # Ignore various Drone CI support files .drone.secrets + diff --git a/LANGUAGE.md b/LANGUAGE.md index 5226dd8..704535a 100644 --- a/LANGUAGE.md +++ b/LANGUAGE.md @@ -1,8 +1,19 @@ # G'Agent -## AgentTCL Language Extension -### TODO -Everything. :( ## G'Agent Language Extension -### TODO -Everything. :( +### @TODO +Document the G'Agent TCL language extension. + +We are using Picol as a vendor package though we have modified it specific to support a subset of the [Agent Tcl / D'Agents](http://www.cs.dartmouth.edu/~dfk/agents/pub/agents/doc.5.1.ps.gz) language extensions. + + The language extensions can be found in the [LANGUAGE.md](https://git.dragonheim.net/dragonheim/gagent/src/branch/main/LANGUAGE.md) file. + + +## AgentTCL Language Extension +### @TODO +Document differences between AgentTCL and G'Agent TCL. + + +## TCL Basics +### Plase note that we do not support the full TCL language specification, only a subset. The full language documentation can be found at [ee.columbia.edu/sensornet/part1](https://www.ee.columbia.edu/~shane/projects/sensornet/part1.pdf) and +[ee.columbia.edu/sensornet/part2](https://www.ee.columbia.edu/~shane/projects/sensornet/part2.pdf) diff --git a/README.md b/README.md index a324fac..c634bd4 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ # G'Agent [![Build Status](https://drone.dragonheim.net/api/badges/dragonheim/gagent/status.svg)](https://drone.dragonheim.net/dragonheim/gagent) -A mobile agent system, written in Go, loosely inspired by the [Agent Tcl / D'Agents](http://www.cs.dartmouth.edu/~dfk/agents/) system created by Robert S. Gray of Dartmouth college. +A Golang based mobile agent system loosely inspired by the [Agent Tcl / D'Agents](http://www.cs.dartmouth.edu/~dfk/agents/) system created by Robert S. Gray of Dartmouth college. ## Purpose -As we move close and closer to a true space age, we need to start thinking about solutions for various light-speed issues such as the bi-directional time delay between the surface of Mars and the surface of Earth. At present it takes between 6 minutes and 44 minutes for single round-trip, making most current data services unuseable. G'Agent is a potential solution for data services given the time delay. +As we move close and closer to a true space-age, we need to start thinking about solutions for various space-age issues such as the bi-directional time delay between the surface of Mars and the surface of Earth. At present it takes between 6 and 44 minutes for single round-trip, making most online data services unuseable. G'Agent is a potential solution for data services given the time delay. -Imagine, for a moment, that you are on Mars and need to perform a search for data in a specific domain space. You would have to explain it to someone on Earth, and hope they understand enough of the data / domain to know where to search and understand you well enough to perform the actual search and then send you the results. With G'Agent, instead you would write a basic script, hereafter called an agent, providing various hints as to the domain space as well as the actual search. Your client would then send it on to a server on Earth. The server, hereafter called a router, may or nothing about the domain space of your search, so the router will use the hints that you provide to attempt to route the request to other routers that may understand the specific domain space. Eventually your agent will reach a router whose servers, hereafter referred to as workers, can handle your search. The workers, will take the agent, run the script portion and collect the response(s). The worker will send the response back to the router(s) that it recieved the agent from, eventually routing it back to you. +Imagine, for a moment, that you are on Mars and need to perform a data search in a specific domain space. You would have to explain it to someone on Earth, and hope they understand enough of the domain space to know where to search and understand you well enough to perform the actual search and then send you the results. With G'Agent, instead you would write a basic script (TCL), hereafter called an agent, providing various hints about the domain space and the search as TCL code. Your client would then send it on to a server, hereafter called a router, on Earth. The router may or may not know anything about the domain space of your search, so the router will use the hints that you provide to attempt to route the agent to known workers or other routers closer to the desired domain space. Eventually your agent will reach a router whose workers can handle your search. The workers, will take the agent, run the script portion and collect the response(s), returning the reponse(s) to the router(s) for return to your client. ## Example Agent ```tcl @@ -14,12 +14,12 @@ Imagine, for a moment, that you are on Mars and need to perform a search for dat 2 : ### Hello Earth ### 3 : ################### 4 : # HINT START -5 : # - console +5 : # - thermal measurements 6 : # - gravity measurements 7 : # - gravity fluctuations 8 : # HINT END 9 : proc hello_earth {} { -10 : puts "Hello Earth, does tempurature variations alter specific gravity?" +10 : puts "Hello Earth, does localized tempurature variations alter specific gravity?" 11 : } 12 : hello_earth ``` @@ -36,6 +36,4 @@ Lines 9 - 11 are a tcl procedure that will be executed on the worker before send Line 12 executes the procedure defined above. ## History -More information can be found in the original [documentation](http://www.cs.dartmouth.edu/~dfk/agents/pub/agents/doc.5.1.ps.gz), and in the project's [wiki](https://git.dragonheim.net/dragonheim/gagent/wiki/_pages). - -We are adding Picol as a vendor package though we have modified it specific to support a subset of the [Agent Tcl / D'Agents](http://www.cs.dartmouth.edu/~dfk/agents/pub/agents/doc.5.1.ps.gz) language extensions. The language extensions can be found in the [LANGUAGE.md](https://git.dragonheim.net/dragonheim/gagent/src/branch/main/LANGUAGE.md) file. +More information about Agent TCL / D'Agents can be found in the original [documentation](http://www.cs.dartmouth.edu/~dfk/agents/pub/agents/doc.5.1.ps.gz), and in the project's [wiki](https://git.dragonheim.net/dragonheim/gagent/wiki/_pages). diff --git a/docker/Dockerfile b/docker/Dockerfile index 0d3697e..0c0b7ce 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,21 +2,27 @@ FROM golang:1.16-alpine3.12 as builder WORKDIR /gagent COPY . . -RUN apk add --no-cache zeromq-dev build-base git && \ - go build -o /gagent/bin/gagent gagent/main.go && \ - strip /gagent/bin/gagent + +ARG GOOS=${GOOS:-linux} +# ARG GOARCH=${GOARCH:-amd64} +ARG CGO_ENABLED=1 + +RUN apk add --no-cache zeromq-dev build-base git +RUN go build -o /gagent/bin/gagent gagent/main.go +RUN strip /gagent/bin/gagent FROM alpine:3.12 LABEL Name="G'Agent" LABEL Maintainer="jwells@dragonheim.net" LABEL License="MIT License" -RUN apk add --no-cache zeromq; mkdir -p -m 0700 /etc/gagent +RUN apk add --no-cache zeromq && mkdir -p -m 0700 /etc/gagent -COPY --from=builder /gagent/examples/example_gagent.hcl /etc/gagent/gagent.hcl +COPY --from=builder /gagent/examples/gagent.hcl /etc/gagent/gagent.hcl COPY --from=builder /gagent/bin/gagent /usr/bin/ -EXPOSE 35570/tcp +# Router Client Worker +EXPOSE 35570/tcp 35571/tcp 35572/tcp VOLUME /etc/gagent CMD ["/usr/bin/gagent"] diff --git a/examples/example_gagent.hcl b/examples/gagent.hcl similarity index 55% rename from examples/example_gagent.hcl rename to examples/gagent.hcl index b89663b..baff785 100644 --- a/examples/example_gagent.hcl +++ b/examples/gagent.hcl @@ -4,7 +4,7 @@ * * Optional. */ -name = "gagent-zulu.example.org" +// name = "gagent-zulu.example.org" /* * This is the mode that this node operates in. There @@ -20,47 +20,59 @@ name = "gagent-zulu.example.org" * send responses to routers for return * the requesting client. * + * If it is not defined, G'Agent will start in setup + * mode and attempt to write a new configuration file + * to the local directory. The file will be called + * gagent.hcl + * * Required. */ mode = "router" /* + * @TODO: Add authentication based on UUID * This is the UUID used throughout the G'Agent system * to uniquely identify this node. It is generated - * during setup. + * during setup if it doesn't exist. * * Required. */ -uuid = "7e9d13fe-5151-5876-66c0-20ca03e8fca4" +// uuid = "7e9d13fe-5151-5876-66c0-20ca03e8fca4" /* * This is the IP Address to bind to, it defaults to * 0.0.0.0 * * Optional. - * - * listenaddr = 0.0.0.0 */ +// listenaddr = 0.0.0.0 -/. - * This is the port to listen on, it defaults to - * 35570. It is strongly recommended that you not - * use ports 0 - 1024 +/* + * This is the port to the router will listen for on + * for clients. It defaults to 35570. * * Optional. - * - * listenport = 35570 */ +// clientport = 35571 /* - * This is the UUID used throughout the G'Agent system - * to uniquely identify this node. + * This is the port to the router will listen for on + * for other routers. It defaults to 35570. * - * Required. + * Optional. */ -// uuid = "04f97538-270d-4ca3-b782-e09ef35830e9" +// routerport = 35570 /* + * This is the port to the router will listen for on + * for workers. It defaults to 35571. + * + * Optional. + */ +// workerport = 35572 + +/* + * @TODO * This is the list of known G'Agent clients. Clients * are not registered dynamically, instead the only * clients that may connect are those listed here, @@ -70,11 +82,11 @@ uuid = "7e9d13fe-5151-5876-66c0-20ca03e8fca4" * Optional. */ // client "alpha" { -// uuid = "04f97538-270d-4ce3-b782-e09ef35830e9" +// clientid = "04f97538-270d-4ce3-b782-e09ef35830e9" // } -// + // client "beta" { -// uuid = "04f97538-270d-4cf3-b782-e09ef35830e9" +// clientid = "04f97538-270d-4cf3-b782-e09ef35830e9" // } /* @@ -83,23 +95,15 @@ uuid = "7e9d13fe-5151-5876-66c0-20ca03e8fca4" * there is more than one router, clients and workers * will connect to them in sequential order. */ -// router "alpha" { -// uuid = "04f97538-270d-4cb3-b782-e09ef35830e9" -// address = "gagent-alpha.example.org" -// tags = [ "a", "b", "c", "d" ] -// } -// -// router "beta" { -// uuid = "04f97538-270d-4cc3-b782-e09ef35830e9" -// address = "gagent-beta.example.org" -// tags = [ "a", "c", "e", "g" ] -// } -// -// router "charlie" { -// uuid = "04f97538-270d-4cd3-b782-e09ef35830e9" -// address = "gagent-charlie.example.org" -// tags = [ "b", "d", "f", "h" ] -// } +// router "alpha" { +// routerid = "04f97538-270d-4cb3-b782-e09ef35830e9" +// address = "gagent-alpha.example.org" +// } + +// router "beta" { +// routerid = "04f97538-270d-4cc3-b782-e09ef35830e9" +// address = "gagent-beta.example.org" +// } /* * This is a list of known G'Agent workers. This is only @@ -109,18 +113,9 @@ uuid = "7e9d13fe-5151-5876-66c0-20ca03e8fca4" * At least one worker is reuqired for routers. */ // worker "alpha" { -// uuid = "04f97538-270d-4ce3-b782-e09ef35830e9" -// address = "gagent-alpha.example.org" -// tags = [ "a", "b", "c", "d" ] -// } -// -// worker "beta" { -// uuid = "04f97538-270d-4cf3-b782-e09ef35830e9" -// tags = [ "a", "c", "e", "g" ] -// } -// -// worker "charlie" { -// uuid = "04f97538-270d-4c04-b782-e09ef35830e9" -// tags = [ "b", "d", "f", "h" ] +// workerid = "04f97538-270d-4ce3-b782-e09ef35830e9" // } +// worker "beta" { +// workerid = "04f97538-270d-4cf3-b782-e09ef35830e9" +// } diff --git a/examples/hello-earth.tcl b/examples/hello-earth.tcl index 2ce12bc..2ca895e 100644 --- a/examples/hello-earth.tcl +++ b/examples/hello-earth.tcl @@ -2,12 +2,12 @@ ### Hello Earth ### ################### # HINT START -# - console +# - thermal measurements # - gravity measurements # - gravity fluctuations # HINT END proc hello_earth {} { - puts "Hello Earth, does tempurature variations alter specific gravity?" + puts "Hello Earth, does localized tempurature variations alter specific gravity?" } hello_earth diff --git a/gagent/main.go b/gagent/main.go index 82c2294..f9dbf9e 100644 --- a/gagent/main.go +++ b/gagent/main.go @@ -6,8 +6,6 @@ import ( "os" "time" - // "math/rand" - gs "git.dragonheim.net/dragonheim/gagent/src/gstructs" gc "git.dragonheim.net/dragonheim/gagent/src/client" @@ -16,9 +14,14 @@ import ( docopt "github.com/aviddiviner/docopt-go" hclsimple "github.com/hashicorp/hcl/v2/hclsimple" + "github.com/hashicorp/hcl/v2/hclwrite" + "github.com/hashicorp/logutils" uuid "github.com/nu7hatch/gouuid" + "github.com/zclconf/go-cty/cty" ) +const VERSION = "0.0.1" + var exitCodes = struct { m map[string]int }{m: map[string]int{ @@ -34,17 +37,24 @@ var exitCodes = struct { }} func main() { + filter := &logutils.LevelFilter{ + Levels: []logutils.LogLevel{"DEBUG", "INFO", "WARN", "ERROR"}, + MinLevel: logutils.LogLevel("WARN"), + Writer: os.Stderr, + } + log.SetOutput(filter) + var config gs.GagentConfig var configFile string = "/etc/gagent/gagent.hcl" config.Name, _ = os.Hostname() + config.Mode = "setup" /* * Set a default UUID for this node. * This is used throughout the G'Agent system to uniquely identify this node. * It can be overriden in the configuration file by setting uuid */ - // rand.Seed(time.Now().UnixNano()) identity, _ := uuid.NewV5(uuid.NamespaceURL, []byte("gagent"+config.Name)) config.UUID = identity.String() @@ -55,11 +65,25 @@ func main() { config.ListenAddr = "0.0.0.0" /* - * By default, G'Agent will use port 35570 to communicate with the routers, - * but you can override it by setting the listenport in the configuration - * file + * By default, G'Agent client will use port 35571 to communicate with the + * routers, but you can override it by setting the clientport in the + * configuration file */ - config.ListenPort = 35570 + config.ClientPort = 35571 + + /* + * By default, G'Agent router will use port 35572 to communicate with + * other routers, but you can override it by setting the routerport in + * the configuration file + */ + config.RouterPort = 35570 + + /* + * By default, G'Agent worker will use port 35570 to communicate with the + * routers, but you can override it by setting the workerport in the + * configuration file + */ + config.WorkerPort = 35572 /* * Create a usage variable and then use that to declare the arguments and @@ -74,44 +98,60 @@ func main() { usage += "\n" usage += "Usage: \n" - usage += " gagent [--config=] [--agent=] \n" + usage += " gagent client [--config=] [--agent=] \n" + usage += " gagent router [--config=] \n" + usage += " gagent worker [--config=] \n" usage += " gagent setup [--config=] \n" + usage += " gagent --version \n" usage += "\n" usage += "Arguments: \n" - usage += " client -- Start as a G'Agent client \n" - usage += " -- filename of the agent to be uploaded to the G'Agent network \n" - usage += "\n" - - usage += " setup -- Write inital configuration file \n" + usage += " client -- Start as a G'Agent client \n" + usage += " router -- Start as a G'Agent router \n" + usage += " worker -- Start as a G'Agent worker \n" + usage += " setup -- Write inital configuration file \n" usage += "\n" usage += "Options:\n" - usage += " config= [default: /etc/gagent/gagent.hcl] \n" + usage += " -h --help -- Show this help screen \n" + usage += " --version -- Show version \n" + usage += " --config= -- [default: /etc/gagent/gagent.hcl] \n" + usage += " --agent= -- filename of the agent to be uploaded to the G'Agent network \n" /* * Consume the usage variable and the command line arguments to create a - * dictionary of the command line arguments. + * dictionary / map. */ - arguments, _ := docopt.ParseDoc(usage) + opts, _ := docopt.ParseArgs(usage, nil, VERSION) + log.Printf("[DEBUG] Arguments are %v\n", opts) - if arguments["--config"] != nil { - configFile = arguments["--config"].(string) + if opts["--config"] != nil { + configFile = opts["--config"].(string) } /* * Let the command line mode override the configuration. */ - if arguments["setup"] == true { + if opts["setup"] == true { config.Mode = "setup" } else { err := hclsimple.DecodeFile(configFile, nil, &config) if err != nil { - log.Printf("Failed to load configuration file: %s.\n", configFile) - log.Printf("%s\n", err) + log.Printf("[ERROR] Failed to load configuration file: %s.\n", configFile) + log.Printf("[ERROR] %s\n", err) os.Exit(exitCodes.m["CONFIG_FILE_MISSING"]) } + if opts["client"] == true { + config.Mode = "client" + } + if opts["router"] == true { + config.Mode = "router" + } + if opts["worker"] == true { + config.Mode = "worker" + } } + log.Printf("[DEBUG] Configuration is %v\n", config) switch config.Mode { case "client": @@ -122,20 +162,23 @@ func main() { * will contact the router and attempt to retrieve the results * of it's most recent request. */ - log.Printf("Arguments are %v\n", arguments) - log.Printf("Configuration is %v\n", config) - log.Printf("Running in client mode\n") - if arguments["--agent"] == nil { - log.Printf("Agent file not specified") + log.Printf("[INFO] Running in client mode\n") + + if len(config.Routers) == 0 { + log.Printf("[ERROR] No routers defined.\n") + os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"]) + } + + if opts["--agent"] == nil { + log.Printf("[ERROR] Agent file not specified") os.Exit(exitCodes.m["AGENT_NOT_DEFINED"]) } - agent, err := ioutil.ReadFile(arguments["--agent"].(string)) + agent, err := ioutil.ReadFile(opts["--agent"].(string)) if err != nil { - log.Printf("Failed to load Agent file: %s", arguments["--agent"]) + log.Printf("[ERROR] Failed to load Agent file: %s", opts["--agent"]) os.Exit(exitCodes.m["AGENT_LOAD_FAILED"]) } for key := range config.Routers { - log.Printf("Calling for router %d", key) go gc.Main(config, key, string(agent)) time.Sleep(10 * time.Second) } @@ -148,12 +191,10 @@ func main() { * or client node. Tags are used by the agent to give hints as to where * it should be routed. */ - log.Printf("Arguments are %v\n", arguments) - log.Printf("Configuration is %v\n", config) - log.Printf("Running in router mode\n") + log.Printf("[INFO] Running in router mode\n") if len(config.Workers) == 0 { - log.Printf("No workers defined.\n") + log.Printf("[ERROR] No workers defined.\n") os.Exit(exitCodes.m["NO_WORKERS_DEFINED"]) } @@ -167,24 +208,40 @@ func main() { * router(s) they are connected. The worker will execute the agent code and * pass the agent and it's results to a router. */ - log.Printf("Arguments are %v\n", arguments) - log.Printf("Configuration is %v\n", config) - log.Printf("Running in worker mode\n") + log.Printf("[INFO] Running in worker mode\n") if len(config.Routers) == 0 { - log.Printf("No routers defined.\n") + log.Printf("[ERROR] No routers defined.\n") os.Exit(exitCodes.m["NO_ROUTERS_DEFINED"]) } - go gw.Main(config) + for key := range config.Routers { + go gw.Main(config, key) + time.Sleep(10 * time.Second) + } + select {} case "setup": - log.Printf("Running in setup mode\n") - os.Exit(exitCodes.m["SETUP_FAILED"]) + log.Printf("[INFO] Running in setup mode\n") + f := hclwrite.NewEmptyFile() + rootBody := f.Body() + rootBody.SetAttributeValue("name", cty.StringVal(config.Name)) + rootBody.SetAttributeValue("mode", cty.StringVal("client")) + rootBody.SetAttributeValue("uuid", cty.StringVal(config.UUID)) + rootBody.AppendNewline() + + routerBlock1 := rootBody.AppendNewBlock("router", []string{config.Name}) + routerBody1 := routerBlock1.Body() + routerBody1.SetAttributeValue("routerid", cty.StringVal(config.UUID)) + routerBody1.SetAttributeValue("address", cty.StringVal("127.0.0.1")) + rootBody.AppendNewline() + + log.Printf("\n%s", f.Bytes()) + os.Exit(exitCodes.m["SUCCESS"]) default: - log.Printf("Unknown operating mode, exiting.\n") + log.Printf("[ERROR] Unknown operating mode, exiting.\n") os.Exit(exitCodes.m["INVALID_MODE"]) } diff --git a/go.mod b/go.mod index 0b224ea..6a2f7d6 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,9 @@ go 1.16 require ( github.com/aviddiviner/docopt-go v0.0.0-20170807220726-d8a1d67efc6a github.com/hashicorp/hcl/v2 v2.8.2 + github.com/hashicorp/logutils v1.0.0 github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d github.com/pebbe/zmq4 v1.2.2 + github.com/zclconf/go-cty v1.2.0 + golang.org/x/text v0.3.6 // indirect ) diff --git a/go.sum b/go.sum index b221093..ceb8ccc 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/hashicorp/hcl/v2 v2.8.2 h1:wmFle3D1vu0okesm8BTLVDyJ6/OL9DCLUwn0b2OptiY= github.com/hashicorp/hcl/v2 v2.8.2/go.mod h1:bQTN5mpo+jewjJgh8jr0JUguIi7qPHUF6yIfAEN3jqY= +github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= +github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -46,8 +48,9 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502175342-a43fa875dd82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/simple_build.sh b/simple_build.sh new file mode 100644 index 0000000..f68b2a9 --- /dev/null +++ b/simple_build.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env sh +# +# +docker buildx build --platform linux/arm/v7,linux/amd64,linux/arm64 --progress plain --pull -t dragonheim/gagent --push -f docker/Dockerfile . diff --git a/src/client/client.go b/src/client/client.go index 7f38b8a..e0e0570 100644 --- a/src/client/client.go +++ b/src/client/client.go @@ -3,31 +3,28 @@ package client import ( "fmt" "log" - "strconv" "sync" "time" gs "git.dragonheim.net/dragonheim/gagent/src/gstructs" - zmq "github.com/pebbe/zmq4" ) // Main is the initiation function for a Client -func Main(config gs.GagentConfig, routerID int, agent string) { - var mu sync.Mutex - var rport = int(config.ListenPort) - if config.Routers[routerID].RouterPort != "" { - rport, _ = strconv.Atoi(config.Routers[routerID].RouterPort) +func Main(config gs.GagentConfig, rid int, agent string) { + log.Printf("[INFO] Starting client\n") + + // Generate connect string for this router. + var rport = int64(config.ClientPort) + if config.Routers[rid].ClientPort != 0 { + rport = config.Routers[rid].ClientPort } + connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport) + log.Printf("[DEBUG] Attempting to connect to %s\n", connectString) - log.Printf("--|%#v|--\n", agent) + var mu sync.Mutex - connectString := fmt.Sprintf("tcp://%s:%d", - config.Routers[routerID].RouterAddr, - rport) - log.Printf("Attempting to connect to %s\n", connectString) - - sock, _ := zmq.NewSocket(zmq.DEALER) + sock, _ := zmq.NewSocket(zmq.REQ) defer sock.Close() sock.SetIdentity(config.UUID) @@ -35,17 +32,22 @@ func Main(config gs.GagentConfig, routerID int, agent string) { go func() { mu.Lock() + log.Printf("[DEBUG] Start sending agent...\n") sock.SendMessage(agent) + log.Printf("[DEBUG] End sending agent...\n") mu.Unlock() }() - for { - time.Sleep(10 * time.Millisecond) - mu.Lock() - msg, err := sock.RecvMessage(zmq.DONTWAIT) - if err == nil { - log.Println(msg[0], config.UUID) - } - mu.Unlock() - } + // time.Sleep(10 * time.Millisecond) + time.Sleep(10 * time.Millisecond) + + // for { + // time.Sleep(10 * time.Millisecond) + // mu.Lock() + // msg, err := sock.RecvMessage(zmq.DONTWAIT) + // if err == nil { + // log.Println(msg[0], config.UUID) + // } + // mu.Unlock() + // } } diff --git a/src/gstructs/gstructs.go b/src/gstructs/gstructs.go index 5b7c8a3..232c534 100644 --- a/src/gstructs/gstructs.go +++ b/src/gstructs/gstructs.go @@ -6,7 +6,9 @@ type GagentConfig struct { Mode string `hcl:"mode,attr"` UUID string `hcl:"uuid,optional"` ListenAddr string `hcl:"listenaddr,optional"` - ListenPort int `hcl:"listenport,optional"` + ClientPort int64 `hcl:"clientport,optional"` + RouterPort int64 `hcl:"routerport,optional"` + WorkerPort int64 `hcl:"workerport,optional"` Clients []*ClientDetails `hcl:"client,block"` Routers []*RouterDetails `hcl:"router,block"` Workers []*WorkerDetails `hcl:"worker,block"` @@ -26,7 +28,7 @@ type ClientDetails struct { * the agent's results to. This attempts to keep the * clients unique globally. */ - ClientID string `hcl:"clientid,attr"` + ClientID string `hcl:"clientid,optional"` } // RouterDetails is details about known routers @@ -43,7 +45,7 @@ type RouterDetails struct { * which MQ router to send the agent's requests to. * This attempts to keep the routers unique globally. */ - RouterID string `hcl:"uuid,attr"` + RouterID string `hcl:"routerid,attr"` /* * This is the IP address or hostname the router @@ -54,11 +56,24 @@ type RouterDetails struct { /* * This is the is the port that the router listens - * on. If not defined, it will default to 35570 - * The router will start up a 0MQ service that - * clients and workers will connect to. + * on for clients. If not defined, it will default + * to 35571. */ - RouterPort string `hcl:"port,optional"` + ClientPort int64 `hcl:"clientport,optional"` + + /* + * This is the is the port that the router listens + * on for routers. If not defined, it will default + * to 35570. + */ + RouterPort int64 `hcl:"workerport,optional"` + + /* + * This is the is the port that the router listens + * on for clients. If not defined, it will default + * to 35572. + */ + WorkerPort int64 `hcl:"workerport,optional"` /* * These tags will be passed to the router upon @@ -83,7 +98,7 @@ type WorkerDetails struct { * send agents to. This attempts to keep the * workers unique globally. */ - WorkerID string `hcl:"uuid,attr"` + WorkerID string `hcl:"workerid,attr"` /* * These tags will be passed to the router upon diff --git a/src/picol/LICENSE b/src/picol/LICENSE new file mode 100644 index 0000000..79304fb --- /dev/null +++ b/src/picol/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2014 Lain dono + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/src/picol/README.md b/src/picol/README.md new file mode 100644 index 0000000..4a505ba --- /dev/null +++ b/src/picol/README.md @@ -0,0 +1,27 @@ +# picol.go + +Original http://oldblog.antirez.com/post/picol.html + +Sample use: +```golang +func CommandPuts(i *picol.Interp, argv []string, pd interface{}) (string, error) { + if len(argv) != 2 { + return "", fmt.Errorf("Wrong number of args for %s %s", argv[0], argv) + } + fmt.Println(argv[1]) + return "", nil +} +... + interp := picol.InitInterp() + // add core functions + interp.RegisterCoreCommands() + // add user function + interp.RegisterCommand("puts", CommandPuts, nil) + // eval + result, err := interp.Eval(string(buf)) + if err != nil { + fmt.Println("ERROR", err, result) + } else { + fmt.Println(result) + } +``` diff --git a/src/picol/examples/fib.tcl b/src/picol/examples/fib.tcl new file mode 100644 index 0000000..187ccf2 --- /dev/null +++ b/src/picol/examples/fib.tcl @@ -0,0 +1,9 @@ +proc fib {x} { + if {<= $x 1} { + return 1 + } else { + + [fib [- $x 1]] [fib [- $x 2]] + } +} + +puts [fib 20] diff --git a/src/picol/examples/t2.tcl b/src/picol/examples/t2.tcl new file mode 100644 index 0000000..8f74ae6 --- /dev/null +++ b/src/picol/examples/t2.tcl @@ -0,0 +1,15 @@ +proc square {x} { + * $x $x +} + +set a 1 +while {<= $a 10} { + if {== $a 5} { + puts {Missing five!} + set a [+ $a 1] + continue + } + puts "I can compute that $a*$a = [square $a]" + set a [+ $a 1] +} + diff --git a/src/picol/examples/tst.tcl b/src/picol/examples/tst.tcl new file mode 100644 index 0000000..3c882cf --- /dev/null +++ b/src/picol/examples/tst.tcl @@ -0,0 +1,5 @@ +proc square {x} { + * $x $x +} + +puts [square 5] diff --git a/src/picol/picol/main.go b/src/picol/picol/main.go new file mode 100644 index 0000000..d53cab1 --- /dev/null +++ b/src/picol/picol/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "io/ioutil" + "os" + + picol "git.dragonheim.net/dragonheim/gagent/src/picol" +) + +var fname = flag.String("f", "", "file name") + +func CommandPuts(i *picol.Interp, argv []string, pd interface{}) (string, error) { + if len(argv) != 2 { + return "", fmt.Errorf("Wrong number of args for %s %s", argv[0], argv) + } + fmt.Println(argv[1]) + return "", nil +} + +func main() { + flag.Parse() + interp := picol.InitInterp() + interp.RegisterCoreCommands() + interp.RegisterCommand("puts", CommandPuts, nil) + + buf, err := ioutil.ReadFile(*fname) + if err == nil { + result, err := interp.Eval(string(buf)) + if err != nil { + fmt.Println("ERRROR", result, err) + } + } else { + for { + fmt.Print("picol> ") + scanner := bufio.NewReader(os.Stdin) + clibuf, _ := scanner.ReadString('\n') + result, err := interp.Eval(clibuf[:len(clibuf)-1]) + if len(result) != 0 { + fmt.Println("ERRROR", result, err) + } + } + } +} diff --git a/src/router/router.go b/src/router/router.go index 18c375d..9156794 100644 --- a/src/router/router.go +++ b/src/router/router.go @@ -3,84 +3,88 @@ package router import ( "fmt" "log" - "math/rand" - "time" gs "git.dragonheim.net/dragonheim/gagent/src/gstructs" - picol "git.dragonheim.net/dragonheim/gagent/src/picol" zmq "github.com/pebbe/zmq4" ) -func pop(msg []string) (head, tail []string) { - if msg[1] == "" { - head = msg[:2] - tail = msg[2:] - } else { - head = msg[:1] - tail = msg[1:] - } - return -} +const ( + WORKER_READY = "\001" // Signals worker is ready +) -// Each router task works on one request at a time and sends a random number -// of replies back, with random delays between replies: -func agentRouter(workerNum int) { - interp := picol.InitInterp() - interp.RegisterCoreCommands() +// Main is the initiation function for a Router +func Main(config gs.GagentConfig) { + log.Printf("[INFO] Starting router\n") - worker, _ := zmq.NewSocket(zmq.DEALER) - defer worker.Close() - worker.Connect("inproc://backend") + client_sock, _ := zmq.NewSocket(zmq.ROUTER) + defer client_sock.Close() + worker_sock, _ := zmq.NewSocket(zmq.DEALER) + defer worker_sock.Close() + + client_sock.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ClientPort)) + worker_sock.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.WorkerPort)) + + workers := make([]string, 0) + + poller1 := zmq.NewPoller() + poller1.Add(worker_sock, zmq.POLLIN) + + poller2 := zmq.NewPoller() + poller2.Add(worker_sock, zmq.POLLIN) + poller2.Add(client_sock, zmq.POLLIN) + +LOOP: for { - // The DEALER socket gives us the reply envelope and message - msg, _ := worker.RecvMessage(0) - identity, content := pop(msg) - log.Printf("Recieved message: %s", content) + // Poll frontend only if we have available workers + var sockets []zmq.Polled + var err error + if len(workers) > 0 { + sockets, err = poller2.Poll(-1) + } else { + sockets, err = poller1.Poll(-1) + } + if err != nil { + break // Interrupted + } + for _, socket := range sockets { + switch s := socket.Socket; s { + case worker_sock: // Handle worker activity on backend + // Use worker identity for load-balancing + msg, err := s.RecvMessage(0) + if err != nil { + break LOOP // Interrupted + } + var identity string + identity, msg = unwrap(msg) + log.Printf("[DEBUG] Worker message received: %s", msg) + workers = append(workers, identity) - // Send 0..4 replies back - replies := rand.Intn(5) - for reply := 0; reply < replies; reply++ { - // Sleep for some fraction of a second - time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond) + // Forward message to client if it's not a READY + if msg[0] != WORKER_READY { + client_sock.SendMessage(msg) + } - log.Printf("Worker %d: %s\n", workerNum, identity) - log.Printf("Worker %d: %s\n", workerNum, content) - worker.SendMessage(identity, content) + case client_sock: + // Get client request, route to first available worker + msg, err := s.RecvMessage(0) + log.Printf("[DEBUG] Client message received: %s", msg) + if err == nil { + worker_sock.SendMessage(workers[0], "", msg) + workers = workers[1:] + } + } } } } -// Main is the initiation function for a Router -func Main(config gs.GagentConfig) { - /* - * This is our router task. - * - * It uses the multi-threaded server model to deal requests out to a - * pool of workers and route replies back to clients. One worker can - * handle one request at a time but one client can talk to multiple - * workers at once. - * - * Frontend socket talks to clients over TCP - */ - frontend, _ := zmq.NewSocket(zmq.ROUTER) - log.Printf("Starting router\n") - defer frontend.Close() - - frontend.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ListenPort)) - - // Backend socket talks to workers over inproc - backend, _ := zmq.NewSocket(zmq.DEALER) - defer backend.Close() - backend.Bind("inproc://backend") - - // Launch pool of worker threads, precise number is not critical - for i := 0; i < 5; i++ { - go agentRouter(i) +func unwrap(msg []string) (head string, tail []string) { + head = msg[0] + if len(msg) > 1 && msg[1] == "" { + tail = msg[2:] + } else { + tail = msg[1:] } - - // Connect backend to frontend via a proxy - err := zmq.Proxy(frontend, backend, nil) - log.Fatalln("Proxy interrupted:", err) + return } diff --git a/src/worker/worker.go b/src/worker/worker.go index 464a674..d6f5c06 100644 --- a/src/worker/worker.go +++ b/src/worker/worker.go @@ -3,76 +3,32 @@ package worker import ( "fmt" "log" - "math/rand" - "time" gs "git.dragonheim.net/dragonheim/gagent/src/gstructs" - picol "git.dragonheim.net/dragonheim/gagent/src/picol" - + // picol "git.dragonheim.net/dragonheim/gagent/src/picol" zmq "github.com/pebbe/zmq4" ) -func pop(msg []string) (head, tail []string) { - if msg[1] == "" { - head = msg[:2] - tail = msg[2:] - } else { - head = msg[:1] - tail = msg[1:] - } - return -} - -// Each worker task works on one request at a time and sends a random number -// of replies back, with random delays between replies: -func agentHandler(workerNum int) { - interp := picol.InitInterp() - interp.RegisterCoreCommands() - - worker, _ := zmq.NewSocket(zmq.DEALER) - defer worker.Close() - worker.Connect("inproc://backend") - - for { - // The DEALER socket gives us the reply envelope and message - msg, _ := worker.RecvMessage(0) - identity, content := pop(msg) - - // Send 0..4 replies back - replies := rand.Intn(5) - for reply := 0; reply < replies; reply++ { - // Sleep for some fraction of a second - time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond) - - log.Printf(fmt.Sprintf("Worker %d: %s\n", workerNum, identity)) - worker.SendMessage(identity, content) - } - } -} - // Main is the initiation function for a Worker -func Main(config gs.GagentConfig) { - // Frontend socket talks to clients over TCP - frontend, _ := zmq.NewSocket(zmq.ROUTER) - log.Printf("Starting worker\n") +func Main(config gs.GagentConfig, rid int) { + log.Printf("[INFO] Starting worker\n") - defer frontend.Close() - log.Printf("Attempting to connect to: %s(%s)\n", config.Routers[0].RouterName, config.Routers[0].RouterAddr) - connectString := fmt.Sprintf("tcp://%s", config.Routers[0].RouterAddr) - frontend.Bind(connectString) - - // Backend socket talks to workers over inproc - backend, _ := zmq.NewSocket(zmq.DEALER) - defer backend.Close() - backend.Bind("inproc://backend") - - // Launch pool of agent handlers - for i := 0; i < 5; i++ { - go agentHandler(i) + // Generate connect string for this router. + var rport = int64(config.WorkerPort) + if config.Routers[rid].WorkerPort != 0 { + rport = config.Routers[rid].WorkerPort } + connectString := fmt.Sprintf("tcp://%s:%d", config.Routers[rid].RouterAddr, rport) - // Connect backend to frontend via a proxy - // err := zmq.Proxy(frontend, backend, nil) - // log.Fatalln("Proxy interrupted:", err) + subscriber, _ := zmq.NewSocket(zmq.REP) + defer subscriber.Close() + + log.Printf("[DEBUG] Attempting to connect to %s\n", connectString) + subscriber.Connect(connectString) + + msg, err := subscriber.Recv(0) + if err != nil { + log.Printf("[DEBUG] Recieved error: %v", err) + } + log.Printf("[DEBUG] Recieved message: %v", msg[0]) } -