fix: Re-initializing after I destroyed the original repository.

This commit is contained in:
James Wells 2021-02-25 17:46:40 -08:00
parent 5863999e8c
commit 8b54fc32c5
Signed by: jwells
GPG key ID: 73196D10B8E65666
20 changed files with 1359 additions and 0 deletions

43
src/client/client.go Normal file
View file

@ -0,0 +1,43 @@
package client
import (
"fmt"
"sync"
"time"
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
zmq "github.com/pebbe/zmq4"
)
// Main is the initiation function for a Client
func Main(config gs.GagentConfig, agent string) {
var mu sync.Mutex
fmt.Printf("Did we make it this far?\n")
fmt.Printf("--|%#v|--\n", agent)
connectString := fmt.Sprintf("tcp://%s", config.Routers[0].RouterAddr)
sock, _ := zmq.NewSocket(zmq.DEALER)
defer sock.Close()
sock.SetIdentity(config.UUID)
sock.Connect(connectString)
go func() {
mu.Lock()
sock.SendMessage(agent)
mu.Unlock()
}()
for {
time.Sleep(10 * time.Millisecond)
mu.Lock()
msg, err := sock.RecvMessage(zmq.DONTWAIT)
if err == nil {
fmt.Println(msg[0], config.UUID)
}
mu.Unlock()
}
}

87
src/gstructs/gstructs.go Normal file
View file

@ -0,0 +1,87 @@
package gstructs
// GagentConfig is the primary construct used by all modes
type GagentConfig struct {
Name string `hcl:"name,optional"`
Mode string `hcl:"mode,attr"`
UUID string `hcl:"uuid,optional"`
ListenAddr string `hc:"listenaddr,optional"`
ListenPort int `hc:"listenport,optional"`
Clients []*ClientDetails `hcl:"client,block"`
Routers []*RouterDetails `hcl:"router,block"`
Workers []*WorkerDetails `hcl:"worker,block"`
}
// ClientDetails is details about known clients
type ClientDetails struct {
/*
* Client name for display purposes in logs and
* diagnostics.
*/
ClientName string `hcl:",label"`
/*
* UUID String for the client node. This is used by
* the router to determine which MQ client to send
* the agent's results to. This attempts to keep the
* clients unique globally.
*/
ClientID string `hcl:"clientid,attr"`
}
// RouterDetails is details about known routers
type RouterDetails struct {
/*
* Router name for display purposes in logs and
* diagnostics
*/
RouterName string `hcl:",label"`
/*
* UUID String for the router node. This is used by
* the clients, routers, and workers to determine
* which MQ router to send the agent's requests to.
* This attempts to keep the routers unique globally.
*/
RouterID string `hcl:"routerid,attr"`
/*
* This is the IP Address and port that the router
* will listen on. The router will start up a 0MQ
* service that clients and workers will connect to.
*/
RouterAddr string `hcl:"address,attr"`
/*
* These tags will be passed to the router upon
* connection. The router will then use these
* tags to help determine which worker / client
* to send the client's requests and results to.
*/
RouterTags []string `hcl:"tags,optional"`
}
// WorkerDetails is details about known workers
type WorkerDetails struct {
/*
* Router name for display purposes in logs and
* diagnostics
*/
WorkerName string `hcl:",label"`
/*
* UUID String for the worker node. This is used
* by the router to determine which MQ client to
* send agents to. This attempts to keep the
* workers unique globally.
*/
WorkerID string `hcl:"workerid,attr"`
/*
* These tags will be passed to the router upon
* connection. The router will then use these
* tags to help determine which worker / client
* to send the agent and it's results to.
*/
WorkerTags []string `hcl:"tags,optional"`
}

214
src/picol/commands.go Normal file
View file

@ -0,0 +1,214 @@
package picol
import (
"fmt"
"strconv"
"strings"
)
func ArityErr(i *Interp, name string, argv []string) error {
return fmt.Errorf("Wrong number of args for %s %s", name, argv)
}
func CommandMath(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 3 {
return "", ArityErr(i, argv[0], argv)
}
a, _ := strconv.Atoi(argv[1])
b, _ := strconv.Atoi(argv[2])
var c int
switch {
case argv[0] == "+":
c = a + b
case argv[0] == "-":
c = a - b
case argv[0] == "*":
c = a * b
case argv[0] == "/":
c = a / b
case argv[0] == ">":
if a > b {
c = 1
}
case argv[0] == ">=":
if a >= b {
c = 1
}
case argv[0] == "<":
if a < b {
c = 1
}
case argv[0] == "<=":
if a <= b {
c = 1
}
case argv[0] == "==":
if a == b {
c = 1
}
case argv[0] == "!=":
if a != b {
c = 1
}
default: // FIXME I hate warnings
c = 0
}
return fmt.Sprintf("%d", c), nil
}
func CommandSet(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 3 {
return "", ArityErr(i, argv[0], argv)
}
i.SetVar(argv[1], argv[2])
return argv[2], nil
}
func CommandUnset(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 2 {
return "", ArityErr(i, argv[0], argv)
}
i.UnsetVar(argv[1])
return "", nil
}
func CommandIf(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 3 && len(argv) != 5 {
return "", ArityErr(i, argv[0], argv)
}
result, err := i.Eval(argv[1])
if err != nil {
return "", err
}
if r, _ := strconv.Atoi(result); r != 0 {
return i.Eval(argv[2])
} else if len(argv) == 5 {
return i.Eval(argv[4])
}
return result, nil
}
func CommandWhile(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 3 {
return "", ArityErr(i, argv[0], argv)
}
for {
result, err := i.Eval(argv[1])
if err != nil {
return "", err
}
if r, _ := strconv.Atoi(result); r != 0 {
result, err := i.Eval(argv[2])
switch err {
case PICOL_CONTINUE, nil:
//pass
case PICOL_BREAK:
return result, nil
default:
return result, err
}
} else {
return result, nil
}
}
}
func CommandRetCodes(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 1 {
return "", ArityErr(i, argv[0], argv)
}
switch argv[0] {
case "break":
return "", PICOL_BREAK
case "continue":
return "", PICOL_CONTINUE
}
return "", nil
}
func CommandCallProc(i *Interp, argv []string, pd interface{}) (string, error) {
var x []string
if pd, ok := pd.([]string); ok {
x = pd
} else {
return "", nil
}
i.callframe = &CallFrame{vars: make(map[string]Var), parent: i.callframe}
defer func() { i.callframe = i.callframe.parent }() // remove the called proc callframe
arity := 0
for _, arg := range strings.Split(x[0], " ") {
if len(arg) == 0 {
continue
}
arity++
i.SetVar(arg, argv[arity])
}
if arity != len(argv)-1 {
return "", fmt.Errorf("Proc '%s' called with wrong arg num", argv[0])
}
body := x[1]
result, err := i.Eval(body)
if err == PICOL_RETURN {
err = nil
}
return result, err
}
func CommandProc(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 4 {
return "", ArityErr(i, argv[0], argv)
}
return "", i.RegisterCommand(argv[1], CommandCallProc, []string{argv[2], argv[3]})
}
func CommandReturn(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 1 && len(argv) != 2 {
return "", ArityErr(i, argv[0], argv)
}
var r string
if len(argv) == 2 {
r = argv[1]
}
return r, PICOL_RETURN
}
func CommandError(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 1 && len(argv) != 2 {
return "", ArityErr(i, argv[0], argv)
}
return "", fmt.Errorf(argv[1])
}
func CommandPuts(i *Interp, argv []string, pd interface{}) (string, error) {
if len(argv) != 2 {
return "", fmt.Errorf("Wrong number of args for %s %s", argv[0], argv)
}
fmt.Println(argv[1])
return "", nil
}
func (i *Interp) RegisterCoreCommands() {
name := [...]string{"+", "-", "*", "/", ">", ">=", "<", "<=", "==", "!="}
for _, n := range name {
i.RegisterCommand(n, CommandMath, nil)
}
i.RegisterCommand("set", CommandSet, nil)
i.RegisterCommand("unset", CommandUnset, nil)
i.RegisterCommand("if", CommandIf, nil)
i.RegisterCommand("while", CommandWhile, nil)
i.RegisterCommand("break", CommandRetCodes, nil)
i.RegisterCommand("continue", CommandRetCodes, nil)
i.RegisterCommand("proc", CommandProc, nil)
i.RegisterCommand("return", CommandReturn, nil)
i.RegisterCommand("error", CommandError, nil)
i.RegisterCommand("puts", CommandPuts, nil)
}

250
src/picol/parser.go Normal file
View file

@ -0,0 +1,250 @@
package picol
import (
"unicode"
"unicode/utf8"
)
const (
PT_ESC = iota
PT_STR
PT_CMD
PT_VAR
PT_SEP
PT_EOL
PT_EOF
)
type Parser struct {
text string
p, start, end, ln int
insidequote int
Type int
}
func InitParser(text string) *Parser {
return &Parser{text, 0, 0, 0, len(text), 0, PT_EOL}
}
func (p *Parser) next() {
_, w := utf8.DecodeRuneInString(p.text[p.p:])
p.p += w
p.ln -= w
}
func (p *Parser) current() rune {
r, _ := utf8.DecodeRuneInString(p.text[p.p:])
return r
}
func (p *Parser) token() (t string) {
defer recover()
return p.text[p.start:p.end]
}
func (p *Parser) parseSep() string {
p.start = p.p
for ; p.p < len(p.text); p.next() {
if !unicode.IsSpace(p.current()) {
break
}
}
p.end = p.p
p.Type = PT_SEP
return p.token()
}
func (p *Parser) parseEol() string {
p.start = p.p
for ; p.p < len(p.text); p.next() {
if p.current() == ';' || unicode.IsSpace(p.current()) {
// pass
} else {
break
}
}
p.end = p.p
p.Type = PT_EOL
return p.token()
}
func (p *Parser) parseCommand() string {
level, blevel := 1, 0
p.next() // skip
p.start = p.p
Loop:
for {
switch {
case p.ln == 0:
break Loop
case p.current() == '[' && blevel == 0:
level++
case p.current() == ']' && blevel == 0:
level--
if level == 0 {
break Loop
}
case p.current() == '\\':
p.next()
case p.current() == '{':
blevel++
case p.current() == '}' && blevel != 0:
blevel--
}
p.next()
}
p.end = p.p
p.Type = PT_CMD
if p.p < len(p.text) && p.current() == ']' {
p.next()
}
return p.token()
}
func (p *Parser) parseVar() string {
p.next() // skip the $
p.start = p.p
if p.current() == '{' {
p.Type = PT_VAR
return p.parseBrace()
}
for p.p < len(p.text) {
c := p.current()
if unicode.IsLetter(c) || ('0' <= c && c <= '9') || c == '_' {
p.next()
continue
}
break
}
if p.start == p.p { // It's just a single char string "$"
p.start = p.p - 1
p.end = p.p
p.Type = PT_STR
} else {
p.end = p.p
p.Type = PT_VAR
}
return p.token()
}
func (p *Parser) parseBrace() string {
level := 1
p.next() // skip
p.start = p.p
Loop:
for p.p < len(p.text) {
c := p.current()
switch {
case p.ln >= 2 && c == '\\':
p.next()
case p.ln == 0 || c == '}':
level--
if level == 0 || p.ln == 0 {
break Loop
}
case c == '{':
level++
}
p.next()
}
p.end = p.p
if p.ln != 0 { // Skip final closed brace
p.next()
}
return p.token()
}
func (p *Parser) parseString() string {
newword := p.Type == PT_SEP || p.Type == PT_EOL || p.Type == PT_STR
if c := p.current(); newword && c == '{' {
p.Type = PT_STR
return p.parseBrace()
} else if newword && c == '"' {
p.insidequote = 1
p.next() // skip
}
p.start = p.p
Loop:
for ; p.ln != 0; p.next() {
switch p.current() {
case '\\':
if p.ln >= 2 {
p.next()
}
case '$', '[':
break Loop
case '"':
if p.insidequote != 0 {
p.end = p.p
p.Type = PT_ESC
p.next()
p.insidequote = 0
return p.token()
}
}
if p.current() == ';' || unicode.IsSpace(p.current()) {
if p.insidequote == 0 {
break Loop
}
}
}
p.end = p.p
p.Type = PT_ESC
return p.token()
}
func (p *Parser) parseComment() string {
for p.ln != 0 && p.current() != '\n' {
p.next()
}
return p.token()
}
func (p *Parser) GetToken() string {
for {
if p.ln == 0 {
if p.Type != PT_EOL && p.Type != PT_EOF {
p.Type = PT_EOL
} else {
p.Type = PT_EOF
}
return p.token()
}
switch p.current() {
case ' ', '\t', '\r':
if p.insidequote != 0 {
return p.parseString()
}
return p.parseSep()
case '\n', ';':
if p.insidequote != 0 {
return p.parseString()
}
return p.parseEol()
case '[':
return p.parseCommand()
case '$':
return p.parseVar()
case '#':
if p.Type == PT_EOL {
p.parseComment()
continue
}
return p.parseString()
default:
return p.parseString()
}
}
return p.token() /* unreached */
}

138
src/picol/picol.go Normal file
View file

@ -0,0 +1,138 @@
package picol
import (
"errors"
"fmt"
"strings"
)
var (
PICOL_RETURN = errors.New("RETURN")
PICOL_BREAK = errors.New("BREAK")
PICOL_CONTINUE = errors.New("CONTINUE")
)
type Var string
type CmdFunc func(i *Interp, argv []string, privdata interface{}) (string, error)
type Cmd struct {
fn CmdFunc
privdata interface{}
}
type CallFrame struct {
vars map[string]Var
parent *CallFrame
}
type Interp struct {
level int
callframe *CallFrame
commands map[string]Cmd
}
func InitInterp() *Interp {
return &Interp{
level: 0,
callframe: &CallFrame{vars: make(map[string]Var)},
commands: make(map[string]Cmd),
}
}
func (i *Interp) Var(name string) (Var, bool) {
for frame := i.callframe; frame != nil; frame = frame.parent {
v, ok := frame.vars[name]
if ok {
return v, ok
}
}
return "", false
}
func (i *Interp) SetVar(name, val string) {
i.callframe.vars[name] = Var(val)
}
func (i *Interp) UnsetVar(name string) {
delete(i.callframe.vars, name)
}
func (i *Interp) Command(name string) *Cmd {
v, ok := i.commands[name]
if !ok {
return nil
}
return &v
}
func (i *Interp) RegisterCommand(name string, fn CmdFunc, privdata interface{}) error {
c := i.Command(name)
if c != nil {
return fmt.Errorf("Command '%s' already defined", name)
}
i.commands[name] = Cmd{fn, privdata}
return nil
}
/* EVAL! */
func (i *Interp) Eval(t string) (string, error) {
p := InitParser(t)
var result string
var err error
argv := []string{}
for {
prevtype := p.Type
// XXX
t = p.GetToken()
if p.Type == PT_EOF {
break
}
switch p.Type {
case PT_VAR:
v, ok := i.Var(t)
if !ok {
return "", fmt.Errorf("No such variable '%s'", t)
}
t = string(v)
case PT_CMD:
result, err = i.Eval(t)
if err != nil {
return result, err
} else {
t = result
}
case PT_ESC:
// XXX: escape handling missing!
case PT_SEP:
prevtype = p.Type
continue
}
// We have a complete command + args. Call it!
if p.Type == PT_EOL {
prevtype = p.Type
if len(argv) != 0 {
c := i.Command(argv[0])
if c == nil {
return "", fmt.Errorf("No such command '%s'", argv[0])
}
result, err = c.fn(i, argv, c.privdata)
if err != nil {
return result, err
}
}
// Prepare for the next command
argv = []string{}
continue
}
// We have a new token, append to the previous or as new arg?
if prevtype == PT_SEP || prevtype == PT_EOL {
argv = append(argv, t)
} else { // Interpolation
argv[len(argv)-1] = strings.Join([]string{argv[len(argv)-1], t}, "")
}
prevtype = p.Type
}
return result, nil
}

85
src/router/router.go Normal file
View file

@ -0,0 +1,85 @@
package router
import (
"fmt"
"log"
"math/rand"
"time"
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
picol "git.dragonheim.net/dragonheim/gagent/src/picol"
zmq "github.com/pebbe/zmq4"
)
func pop(msg []string) (head, tail []string) {
if msg[1] == "" {
head = msg[:2]
tail = msg[2:]
} else {
head = msg[:1]
tail = msg[1:]
}
return
}
// Main is the initiation function for a Router
func Main(config gs.GagentConfig) {
/*
* This is our router task.
*
* It uses the multi-threaded server model to deal requests out to a
* pool of workers and route replies back to clients. One worker can
* handle one request at a time but one client can talk to multiple
* workers at once.
*
* Frontend socket talks to clients over TCP
*/
frontend, _ := zmq.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind(fmt.Sprintf("tcp://%s:%d", config.ListenAddr, config.ListenPort))
// Backend socket talks to workers over inproc
backend, _ := zmq.NewSocket(zmq.DEALER)
defer backend.Close()
backend.Bind("inproc://backend")
// Launch pool of worker threads, precise number is not critical
for i := 0; i < 5; i++ {
go agentRouter(i)
}
// Connect backend to frontend via a proxy
err := zmq.Proxy(frontend, backend, nil)
log.Fatalln("Proxy interrupted:", err)
}
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
func agentRouter(workerNum int) {
interp := picol.InitInterp()
interp.RegisterCoreCommands()
worker, _ := zmq.NewSocket(zmq.DEALER)
defer worker.Close()
worker.Connect("inproc://backend")
for {
// The DEALER socket gives us the reply envelope and message
msg, _ := worker.RecvMessage(0)
identity, content := pop(msg)
// Send 0..4 replies back
replies := rand.Intn(5)
for reply := 0; reply < replies; reply++ {
// Sleep for some fraction of a second
time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond)
fmt.Println(fmt.Sprintf("Worker %d: %s", workerNum, identity))
fmt.Println(fmt.Sprintf("Worker %d: %s", workerNum, content))
worker.SendMessage(identity, content)
}
}
}

75
src/worker/worker.go Normal file
View file

@ -0,0 +1,75 @@
package worker
import (
"fmt"
// "log"
"math/rand"
"time"
gs "git.dragonheim.net/dragonheim/gagent/src/gstructs"
picol "git.dragonheim.net/dragonheim/gagent/src/picol"
zmq "github.com/pebbe/zmq4"
)
func pop(msg []string) (head, tail []string) {
if msg[1] == "" {
head = msg[:2]
tail = msg[2:]
} else {
head = msg[:1]
tail = msg[1:]
}
return
}
// Main is the initiation function for a Worker
func Main(config gs.GagentConfig) {
// Frontend socket talks to clients over TCP
frontend, _ := zmq.NewSocket(zmq.ROUTER)
defer frontend.Close()
connectString := fmt.Sprintf("tcp://%s", config.Routers[0].RouterAddr)
frontend.Bind(connectString)
// Backend socket talks to workers over inproc
backend, _ := zmq.NewSocket(zmq.DEALER)
defer backend.Close()
backend.Bind("inproc://backend")
// Launch pool of agent handlers
for i := 0; i < 5; i++ {
go agentHandler(i)
}
// Connect backend to frontend via a proxy
// err := zmq.Proxy(frontend, backend, nil)
// log.Fatalln("Proxy interrupted:", err)
}
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
func agentHandler(workerNum int) {
interp := picol.InitInterp()
interp.RegisterCoreCommands()
worker, _ := zmq.NewSocket(zmq.DEALER)
defer worker.Close()
worker.Connect("inproc://backend")
for {
// The DEALER socket gives us the reply envelope and message
msg, _ := worker.RecvMessage(0)
identity, content := pop(msg)
// Send 0..4 replies back
replies := rand.Intn(5)
for reply := 0; reply < replies; reply++ {
// Sleep for some fraction of a second
time.Sleep(time.Duration(rand.Intn(1000)+1) * time.Millisecond)
fmt.Println(fmt.Sprintf("Worker %d: %s", workerNum, identity))
worker.SendMessage(identity, content)
}
}
}