diff --git a/src/contexts/cli/cli.go b/src/contexts/cli/cli.go index 44f59486..74e77961 100644 --- a/src/contexts/cli/cli.go +++ b/src/contexts/cli/cli.go @@ -1018,6 +1018,24 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", m.Table() return } + switch arg[0] { + case "stop": + if timer := m.Confm("timer", arg[1]); timer != nil { + timer["stop"] = true + } + cli.schedule(m) + return + case "start": + if timer := m.Confm("timer", arg[1]); timer != nil { + timer["stop"] = false + } + cli.schedule(m) + return + case "delete": + delete(m.Confm("timer"), arg[1]) + cli.schedule(m) + return + } now := int64(m.Sess("cli").Cmd("time").Appendi("timestamp")) begin := now @@ -1046,6 +1064,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", "repeat": repeat, "order": order, "done": false, + "stop": false, "time": arg[0], "cmd": arg[1:], "msg": 0, @@ -1062,18 +1081,21 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", if m.Conf("timer_next") == "" { break } - timer := m.Confv("timer", m.Conf("timer_next")).(map[string]interface{}) - m.Log("info", "timer %s %v", m.Conf("timer_next"), timer["cmd"]) - msg := m.Sess("cli").Cmd("source", timer["cmd"]) - timer["result"] = msg.Meta["result"] - timer["msg"] = msg.Code() + if timer := m.Confm("timer", m.Conf("timer_next")); timer != nil && !kit.Right(timer["stop"]) { + m.Log("info", "timer %s %v", m.Conf("timer_next"), timer["cmd"]) - if timer["repeat"].(bool) { - timer["action_time"] = int64(m.Sess("cli").Cmd("time", timer["action_time"], timer["order"], timer["time"]).Appendi("timestamp")) - } else { - timer["done"] = true + msg := m.Sess("cli").Cmd("source", timer["cmd"]) + timer["result"] = msg.Meta["result"] + timer["msg"] = msg.Code() + + if timer["repeat"].(bool) { + timer["action_time"] = int64(m.Sess("cli").Cmd("time", timer["action_time"], timer["order"], timer["time"]).Appendi("timestamp")) + } else { + timer["done"] = true + } } + cli.schedule(m) } } diff --git a/src/contexts/ctx/ctx.go b/src/contexts/ctx/ctx.go index 982139d3..0b4b2e33 100644 --- a/src/contexts/ctx/ctx.go +++ b/src/contexts/ctx/ctx.go @@ -156,6 +156,9 @@ func (c *Context) Start(m *Message, arg ...string) bool { c.Close(m, m.Meta["detail"]...) c.exit <- true } + }, func(m *Message) { + c.Close(m, m.Meta["detail"]...) + c.exit <- true }) if sync { @@ -173,8 +176,9 @@ func (c *Context) Close(m *Message, arg ...string) bool { if m.target == c { for i := len(c.requests) - 1; i >= 0; i-- { if msg := c.requests[i]; msg.code == m.code { - m.Log("close", "request %d/%d", i, len(c.requests)-1) if c.Server == nil || c.Server.Close(m, arg...) { + m.Log("close", "request %d/%d", i, len(c.requests)-1) + msg.Free() for j := i; j < len(c.requests)-1; j++ { c.requests[j] = c.requests[j+1] } @@ -289,6 +293,7 @@ type Message struct { Data map[string]interface{} callback func(msg *Message) (sub *Message) + freedoms []func(msg *Message) (done bool) Sessions map[string]*Message messages []*Message @@ -436,6 +441,13 @@ func (m *Message) Format(arg ...interface{}) string { meta = append(meta, kit.Format(m.code)) case "ship": meta = append(meta, fmt.Sprintf("%d(%s->%s)", m.code, m.source.Name, m.target.Name)) + case "source": + target := m.target + m.target = m.source + meta = append(meta, m.Cap("module")) + m.target = target + case "target": + meta = append(meta, m.Cap("module")) case "detail": meta = append(meta, fmt.Sprintf("%v", m.Meta["detail"])) @@ -609,18 +621,7 @@ func (m *Message) Has(key ...string) bool { return false } func (m *Message) CopyTo(msg *Message, arg ...string) *Message { - if m == msg { - return m - } - if len(arg) == 0 { - if msg.Hand { - msg.Copy(m, "append").Copy(m, "result") - } else { - msg.Copy(m, "option") - } - } else { - msg.Copy(m, arg...) - } + msg.Copy(m, arg...) return m } func (m *Message) Copy(msg *Message, arg ...string) *Message { @@ -629,7 +630,7 @@ func (m *Message) Copy(msg *Message, arg ...string) *Message { } if len(arg) == 0 { if msg.Hand { - arg = append(arg, "append") + arg = append(arg, "append", "result") } else { arg = append(arg, "option") } @@ -1010,6 +1011,9 @@ func (m *Message) Parse(arg interface{}) string { } func (m *Message) Find(name string, root ...bool) *Message { + if name == "" { + return m.Spawn() + } target := m.target.root if len(root) > 0 && !root[0] { target = m.target @@ -1118,10 +1122,17 @@ func (m *Message) Sess(key string, arg ...interface{}) *Message { return nil } func (m *Message) Match(key string, spawn bool, hand func(m *Message, s *Context, c *Context, key string) bool) *Message { + if m == nil { + return m + } + if strings.Contains(key, ".") { arg := strings.Split(key, ".") m, key = m.Sess(arg[0], spawn), arg[1] } + if m == nil { + return m + } context := []*Context{m.target} for _, v := range []string{"aaa", "cli"} { @@ -1141,26 +1152,41 @@ func (m *Message) Match(key string, spawn bool, hand func(m *Message, s *Context return m } func (m *Message) Call(cb func(msg *Message) (sub *Message), arg ...interface{}) *Message { + if m == nil { + return m + } if m.callback = cb; len(arg) > 0 || len(m.Meta["detail"]) > 0 { + m.Log("call", m.Format("detail", "option")) m.Cmd(arg...) } return m } -func (m *Message) Back(msg *Message) *Message { - if msg == nil || m.callback == nil { +func (m *Message) Back(ms ...*Message) *Message { + if m.callback == nil { return m } - if msg.Hand { - m.Log("cbs", msg.Format("ship", "result", "append")) - } else { - m.Log("cbs", msg.Format("ship", "detail", "option")) + if len(ms) == 0 { + ms = append(ms, m.Spawn(m.source).Copy(m, "append").Copy(m, "result")) } - if sub := m.callback(msg); sub != nil && m.message != nil && m.message != m { - m.message.Back(sub) + ns := []*Message{} + + for _, msg := range ms { + if msg.Hand { + m.Log("back", msg.Format("ship", "result", "append")) + } else { + m.Log("back", msg.Format("ship", "detail", "option")) + } + + if sub := m.callback(msg); sub != nil && m.message != nil && m.message != m { + ns = append(ns, sub) + } } + if len(ns) > 0 { + m.message.Back(ns...) + } return m } func (m *Message) Backs(msg *Message) *Message { @@ -1172,15 +1198,32 @@ func (m *Message) CallBack(sync bool, cb func(msg *Message) (sub *Message), arg return m.Call(cb, arg...) } - wait := make(chan *Message) + wait := make(chan *Message, 10) go m.Call(func(sub *Message) *Message { msg := cb(sub) + m.Log("sync", m.Format("done", "result", "append")) wait <- m return msg }, arg...) + m.Log("sync", m.Format("wait", "result", "append")) return <-wait } +func (m *Message) Free(cbs ...func(msg *Message) (done bool)) *Message { + if len(cbs) == 0 { + for i := len(m.freedoms) - 1; i >= 0; i-- { + m.Log("free", "%d/%d", i, len(m.freedoms)-1) + if !m.freedoms[i](m) { + break + } + m.freedoms = m.freedoms[:i] + } + return m + } + + m.freedoms = append(m.freedoms, cbs...) + return m +} func (m *Message) Assert(e interface{}, msg ...string) bool { switch v := e.(type) { @@ -1204,7 +1247,7 @@ func (m *Message) Assert(e interface{}, msg ...string) bool { } m.Log("error", "%v", e) - panic(m.Set("result", "error: ", kit.Format(e), "\n")) + panic(e) } func (m *Message) TryCatch(msg *Message, safe bool, hand ...func(msg *Message)) *Message { defer func() { @@ -1236,6 +1279,9 @@ func (m *Message) TryCatch(msg *Message, safe bool, hand ...func(msg *Message)) func (m *Message) Start(name string, help string, arg ...string) bool { return m.Set("detail", arg).target.Spawn(m, name, help).Begin(m).Start(m) } +func (m *Message) Close(arg ...string) bool { + return m.Target().Close(m, arg...) +} func (m *Message) Wait() bool { if m.target.exit != nil { return <-m.target.exit @@ -1523,7 +1569,7 @@ var Index = &Context{Name: "ctx", Help: "模块中心", Server: &CTX{}, "list_help": &Config{Name: "list_help", Value: "list command", Help: "命令列表帮助"}, "table_compact": &Config{Name: "table_compact", Value: "false", Help: "命令列表帮助"}, - "table_col_sep": &Config{Name: "table_col_sep", Value: "\t", Help: "命令列表帮助"}, + "table_col_sep": &Config{Name: "table_col_sep", Value: " ", Help: "命令列表帮助"}, "table_row_sep": &Config{Name: "table_row_sep", Value: "\n", Help: "命令列表帮助"}, "table_space": &Config{Name: "table_space", Value: " ", Help: "命令列表帮助"}, @@ -1701,28 +1747,27 @@ var Index = &Context{Name: "ctx", Help: "模块中心", Server: &CTX{}, } } - if len(arg) > 0 { - switch arg[0] { - case "time", "code", "ship", "full", "chain", "stack": - m.Echo(m.Format(arg[0])) - return - } + if len(arg) == 0 { + m.Format("summary", msg, "deep") + msg.CopyTo(m) + return } - if len(arg) > 0 && arg[0] == "spawn" { + switch arg[0] { + case "time", "code", "ship", "full", "chain", "stack": + m.Echo(msg.Format(arg[0])) + case "spawn": sub := msg.Spawn() m.Echo("%d", sub.code) - return - } - - if len(arg) > 0 { + case "call": + case "back": + msg.Back(m) + case "free": + msg.Free() + default: msg = msg.Spawn().Cmd(arg) m.Copy(msg, "append").Copy(msg, "result") - return } - - m.Format("summary", msg, "deep") - msg.CopyTo(m) return }}, "detail": &Command{Name: "detail [index] [value...]", Help: "查看或添加参数", Hand: func(m *Message, c *Context, key string, arg ...string) (e error) { @@ -2343,6 +2388,8 @@ var Index = &Context{Name: "ctx", Help: "模块中心", Server: &CTX{}, }) m.Sort("key", "str").Table() return + case 1: + m.Echo(m.Cap(arg[0])) case 2: if arg[0] == "delete" { delete(m.target.Caches, arg[1]) diff --git a/src/contexts/gdb/gdb.go b/src/contexts/gdb/gdb.go index be33cb0c..ecb49cbd 100644 --- a/src/contexts/gdb/gdb.go +++ b/src/contexts/gdb/gdb.go @@ -110,7 +110,7 @@ var Index = &ctx.Context{Name: "gdb", Help: "调试中心", "start": map[string]interface{}{"value": map[string]interface{}{"enable": false}}, }, "command": map[string]interface{}{"value": map[string]interface{}{"enable": false}, - "demo": map[string]interface{}{"value": map[string]interface{}{"enable": true}}, + "shit": map[string]interface{}{"value": map[string]interface{}{"enable": true}}, }, "config": map[string]interface{}{"value": map[string]interface{}{"enable": true}}, "cache": map[string]interface{}{"value": map[string]interface{}{"enable": false}, diff --git a/src/contexts/log/log.go b/src/contexts/log/log.go index 061eb633..f2ae3a90 100644 --- a/src/contexts/log/log.go +++ b/src/contexts/log/log.go @@ -125,7 +125,7 @@ var Index = &ctx.Context{Name: "log", Help: "日志中心", "trace": map[string]interface{}{"value": map[string]interface{}{"file": "error.log", "meta": []interface{}{"time", "ship"}, "color_begin": "\033[32m", "color_end": "\033[0m"}}, "debug": map[string]interface{}{"value": map[string]interface{}{"file": "debug.log", "meta": []interface{}{"time", "ship"}}}, "search": map[string]interface{}{"value": map[string]interface{}{"file": "debug.log", "meta": []interface{}{"time", "ship"}}}, - "cbs": map[string]interface{}{"value": map[string]interface{}{"file": "debug.log", "meta": []interface{}{"time", "ship"}}}, + "cbs": map[string]interface{}{"value": map[string]interface{}{"file": "bench.log", "meta": []interface{}{"time", "ship"}}}, "bench": map[string]interface{}{"value": map[string]interface{}{"file": "bench.log", "meta": []interface{}{"time", "ship"}}}, "begin": map[string]interface{}{"value": map[string]interface{}{"file": "bench.log", "meta": []interface{}{"time", "ship"}, "color_begin": "\033[31m", "color_end": "\033[0m"}}, diff --git a/src/contexts/nfs/nfs.go b/src/contexts/nfs/nfs.go index caab65df..be382597 100644 --- a/src/contexts/nfs/nfs.go +++ b/src/contexts/nfs/nfs.go @@ -31,6 +31,7 @@ type NFS struct { out *os.File send chan *ctx.Message + echo chan *ctx.Message hand map[int]*ctx.Message *ctx.Context @@ -644,9 +645,16 @@ func (nfs *NFS) shadow(args ...interface{}) *NFS { func (nfs *NFS) Send(meta string, arg ...interface{}) *NFS { m := nfs.Context.Message() - n, e := fmt.Fprintf(nfs.io, "%s: %s\n", url.QueryEscape(meta), url.QueryEscape(kit.Format(arg[0]))) + line := "\n" + if meta != "" { + line = fmt.Sprintf("%s: %s\n", url.QueryEscape(meta), url.QueryEscape(kit.Format(arg[0]))) + } + + n, e := fmt.Fprint(nfs.io, line) m.Assert(e) m.Capi("nwrite", n) + m.Log("send", "%d [%s]", len(line), line) + return nfs } func (nfs *NFS) Recv(line string) (field string, value string) { @@ -661,6 +669,9 @@ func (nfs *NFS) Recv(line string) (field string, value string) { if len(word) == 1 { return } + if len(word[1]) == 0 { + return + } value, e = url.QueryUnescape(word[1]) m.Assert(e) @@ -765,30 +776,30 @@ func (nfs *NFS) Start(m *ctx.Message, arg ...string) bool { m.Cap("stream", m.Option("ms_source")) nfs.io = m.Optionv("io").(io.ReadWriter) nfs.send = make(chan *ctx.Message, 10) + nfs.echo = make(chan *ctx.Message, 10) nfs.hand = map[int]*ctx.Message{} go func() { //发送消息队列 for { + msg, code, meta, body := m, 0, "detail", "option" select { - case msg := <-nfs.send: - code, meta, body := "0", "detail", "option" - if msg.Options("remote_code") { // 发送响应 - code, meta, body = msg.Option("remote_code"), "result", "append" - } else { // 发送请求 - code = kit.Format(m.Capi("nsend", 1)) - nfs.hand[kit.Int(code)] = msg - } + case msg = <-nfs.send: + code = msg.Code() + nfs.hand[code] = msg + case msg = <-nfs.echo: + code, meta, body = msg.Optioni("remote_code"), "result", "append" + } - nfs.Send("code", code) - for _, v := range msg.Meta[meta] { - nfs.Send(meta, v) - } - for _, k := range msg.Meta[body] { - for _, v := range msg.Meta[k] { - nfs.Send(k, v) - } + nfs.Send("code", code) + for _, v := range msg.Meta[meta] { + nfs.Send(meta, v) + } + for _, k := range msg.Meta[body] { + for _, v := range msg.Meta[k] { + nfs.Send(k, v) } } + nfs.Send("") } }() @@ -796,38 +807,47 @@ func (nfs *NFS) Start(m *ctx.Message, arg ...string) bool { msg, code, head, body := m, "0", "result", "append" for bio := bufio.NewScanner(nfs.io); bio.Scan(); { - switch field, value := nfs.Recv(bio.Text()); field { - case "code": - msg, code = m.Sess("ms_target"), value - msg.Meta = map[string][]string{} + m.TryCatch(m, true, func(m *ctx.Message) { + switch field, value := nfs.Recv(bio.Text()); field { + case "code": + msg, code = m.Sess("ms_target"), value + msg.Meta = map[string][]string{} - case "detail": - head, body = "detail", "option" - msg.Add(field, value) + case "detail": + head, body = "detail", "option" + msg.Add(field, value) - case "result": - head, body = "result", "append" - msg.Add(field, value) + case "result": + head, body = "result", "append" + msg.Add(field, value) - case "": - if head == "detail" { // 接收请求 - msg.Option("remote_code", code) - msg.Call(func(sub *ctx.Message) *ctx.Message { - nfs.send <- msg.Copy(sub, "append").Copy(sub, "result") - return nil - }) - } else { // 接收响应 - h := nfs.hand[kit.Int(code)] - h.Copy(msg, "result").Copy(msg, "append").Back(h) + case "": + if head == "detail" { // 接收请求 + msg.Detail(-1, "remote") + msg.Option("remote_code", code) + msg.Call(func(msg *ctx.Message) *ctx.Message { + nfs.echo <- msg + return nil + }) + } else { // 接收响应 + h := nfs.hand[kit.Int(code)] + h.Copy(msg, "result").Copy(msg, "append").Back(h) + } + msg, code, head, body = nil, "0", "result", "append" + + default: + msg.Add(body, field, value) } - msg, code, head, body = nil, "0", "result", "append" - - default: - msg.Add(body, field, value) - } - + }, func(m *ctx.Message) { + for bio.Scan() { + if text := bio.Text(); text == "" { + break + } + } + }) } + m.Sess("tcp", false).Close() return true } func (nfs *NFS) Close(m *ctx.Message, arg ...string) bool { @@ -1252,6 +1272,12 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心", } return }}, + "term": &ctx.Command{Name: "term action args...", Help: "", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { + nfs.Term(m, arg[0], arg[1:]) + } + return + }}, "action": &ctx.Command{Name: "action cmd", Help: "", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { msg := m.Cmd("cli.source", arg) @@ -1264,6 +1290,7 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心", "remote": &ctx.Command{Name: "remote listen|dial args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if _, ok := m.Target().Server.(*NFS); m.Assert(ok) { //{{{ m.Sess("tcp").Call(func(sub *ctx.Message) *ctx.Message { + sub.Sess("ms_source", sub) sub.Sess("ms_target", m.Source()) sub.Start(fmt.Sprintf("file%d", m.Capi("nfile", 1)), "远程文件") return sub diff --git a/src/contexts/ssh/ssh.go b/src/contexts/ssh/ssh.go index 6ed3e7d0..551ec4f7 100644 --- a/src/contexts/ssh/ssh.go +++ b/src/contexts/ssh/ssh.go @@ -2,12 +2,15 @@ package ssh import ( "contexts/ctx" + "encoding/base64" + "fmt" + "os" + "path" "strings" "toolkit" ) type SSH struct { - peer map[string]*ctx.Message *ctx.Context } @@ -20,270 +23,212 @@ func (ssh *SSH) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server return s } func (ssh *SSH) Begin(m *ctx.Message, arg ...string) ctx.Server { - ssh.Caches["hostname"] = &ctx.Cache{Name: "hostname", Value: "", Help: "主机数量"} return ssh } func (ssh *SSH) Start(m *ctx.Message, arg ...string) bool { - m.Cap("stream", m.Source().Name) - return false + return true } func (ssh *SSH) Close(m *ctx.Message, arg ...string) bool { - return false + return true } var Index = &ctx.Context{Name: "ssh", Help: "集群中心", Caches: map[string]*ctx.Cache{ - "nhost": &ctx.Cache{Name: "主机数量", Value: "0", Help: "主机数量"}, - "domain": &ctx.Cache{Name: "domain", Value: "", Help: "主机域名"}, + "nhost": &ctx.Cache{Name: "nhost", Value: "0", Help: "主机数量"}, + "hostname": &ctx.Cache{Name: "hostname", Value: "shy", Help: "本机域名"}, }, Configs: map[string]*ctx.Config{ - "host": &ctx.Config{Name: "host", Value: map[string]interface{}{}, Help: "主机数量"}, - - "hostname": &ctx.Config{Name: "hostname", Value: "com", Help: "主机数量"}, - - "domain.json": &ctx.Config{Name: "domain.json", Value: "var/domain.json", Help: "主机数量"}, - "domain.png": &ctx.Config{Name: "domain.png", Value: "var/domain.png", Help: "主机数量"}, + "host": &ctx.Config{Name: "host", Value: map[string]interface{}{}, Help: "主机信息"}, + "hostname": &ctx.Config{Name: "hostname", Value: "com", Help: "主机域名"}, + "current": &ctx.Config{Name: "current", Value: "", Help: "当前主机"}, + "timer": &ctx.Config{Name: "timer", Value: "", Help: "当前主机"}, }, Commands: map[string]*ctx.Command{ - "remote": &ctx.Command{Name: "remote listen|dial|send args...", Help: "网络监听", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "remote": &ctx.Command{Name: "remote listen|dial args...", Help: "远程连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if len(arg) == 0 { m.Cmdy("ctx.config", "host") return } - host := m.Confm("host", arg[0]) - if host != nil { - arg = arg[1:] - } - switch arg[0] { + case "redial": + if !m.Caps("hostname") { + m.Cmdx("remote", "dial", arg[1:]) + } case "listen", "dial": - m.Call(func(sub *ctx.Message) *ctx.Message { - h, _ := kit.Hash("host", m.Option("ms_source"), "uniq") - m.Log("fuck", "what %v", sub.Format()) - m.Confv("host", h, map[string]interface{}{ - "module": sub.Cap("module"), - "type": arg[0], - }) + m.Call(func(nfs *ctx.Message) *ctx.Message { + if arg[0] == "dial" { + if m.Confs("timer") { + m.Conf("timer", m.Cmdx("cli.timer", "delete", m.Conf("timer"))) + } + m.Spawn(nfs.Target()).Call(func(cmd *ctx.Message) *ctx.Message { + m.Cap("stream", nfs.Format("target")) + m.Cap("hostname", cmd.Result(0)) + + m.Confv("host", cmd.Result(1), map[string]interface{}{ + "module": nfs.Format("target"), + "create_time": m.Time(), + "access_time": m.Time(), + }) + if !m.Confs("current") { + m.Conf("current", cmd.Result(1)) + } + + nfs.Free(func(nfs *ctx.Message) bool { + m.Conf("timer", m.Cmdx("cli.timer", "repeat", "10s", "context", "ssh", "remote", "redial", arg[1:])) + + m.Log("info", "delete host %s", cmd.Result(1)) + delete(m.Confm("host"), cmd.Result(1)) + m.Cap("hostname", "") + m.Cap("stream", "") + return true + }) + return nil + }, "send", "recv", "add", m.Conf("hostname")) + } return nil }, "nfs.remote", arg) - case "exec": - m.Find(kit.Format(host["module"]), true).CallBack(true, func(sub *ctx.Message) *ctx.Message { - m.Copy(sub) - return nil - }, arg[1:]) - } - - return - }}, - - "demo": &ctx.Command{Name: "demo", Help: "远程执行", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { - m.Echo("demo") - return - }}, - - "send": &ctx.Command{Name: "send [domain str] cmd arg...", Help: "远程执行", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { - if ssh, ok := m.Target().Server.(*SSH); m.Assert(ok) { - origin, domain := "", "" - if len(arg) > 1 && arg[0] == "domain" { - origin, arg = arg[1], arg[2:] - if d := strings.TrimPrefix(origin, m.Cap("domain")); len(d) > 0 && d[0] == '.' { - domain = d[1:] - } else if d == "" { - domain = d + case "recv": + switch arg[1] { + case "add": + if host := m.Confm("host", arg[2]); host == nil { + m.Confv("host", arg[2], map[string]interface{}{ + "module": m.Format("source"), + "create_time": m.Time(), + "access_time": m.Time(), + }) + } else if len(arg) > 3 && arg[3] == kit.Format(host["token"]) { + host["access_time"] = m.Time() + host["module"] = m.Format("source") } else { - domain = origin + arg[2] = fmt.Sprintf("%s_%d", arg[2], m.Capi("nhost", 1)) + m.Confv("host", arg[2], map[string]interface{}{ + "module": m.Format("source"), + "create_time": m.Time(), + "access_time": m.Time(), + }) + } + if !m.Confs("current") { + m.Conf("current", arg[2]) } - if domain == "" { //本地执行 - msg := m.Spawn().Cmd(arg) - m.Copy(msg, "result").Copy(msg, "append") - return - } - } else { - if m.Has("send_code") { //本地执行 - msg := m.Spawn().Cmd(arg) - m.Copy(msg, "result").Copy(msg, "append") - } else { //对端执行 - msg := m.Spawn(ssh.Message().Source()) - msg.Cmd("send", arg) - m.Copy(msg, "result").Copy(msg, "append") - } - return - } - - match := false - host := strings.SplitN(domain, ".", 2) - c.Travel(m, func(m *ctx.Message, i int) bool { - if i == 0 { + m.Echo(arg[2]).Echo(m.Cap("hostname")).Back(m) + m.Sess("ms_source", false).Free(func(msg *ctx.Message) bool { + m.Log("info", "delete host %s", arg[2]) + delete(m.Confm("host"), arg[2]) return true - } - if m.Cap("hostname") == host[0] || "*" == host[0] { - ssh, ok := m.Target().Server.(*SSH) - m.Assert(ok) - msg := m.Spawn(ssh.Message().Source()) - - if len(host) > 1 { - msg.Cmd("send", "domain", host[1], arg) - } else { - msg.Cmd("send", arg) - } - m.Copy(msg, "result").Copy(msg, "append") - - if !match { - match = !m.Appends("domain_miss") - } - return host[0] == "*" - } - return false - }) - - if match { - return - } - if m.Target() == c && m.Has("send_code") { - m.Appends("domain_miss", true) - return - } - if m.Cap("domain") == m.Conf("hostname") { - m.Appends("domain_miss", true) - return - } - - // 向上路由 - msg := m.Spawn(c.Message().Source()) - msg.Cmd("send", "domain", origin, arg) - m.Copy(msg, "result").Copy(msg, "append") - } - return - }}, - - "pwd": &ctx.Command{Name: "pwd [hostname]", Help: "主机域名", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { - if len(arg) == 0 { - m.Echo(m.Cap("domain")) - return - } - - if m.Options("send_code") { - if m.Target() == c { - msg := m.Spawn().Cmd("send", "pwd", m.Confx("hostname", arg, 0)) - m.Cap("hostname", msg.Result(0)) - m.Cap("domain", msg.Result(1)) - } else { - hostname := arg[0] - c.Travel(m, func(m *ctx.Message, line int) bool { - if hostname == m.Cap("hostname") { - hostname += m.Cap("nhost") - return false - } - return false }) - m.Echo(m.Cap("hostname", hostname)) - m.Echo("%s.%s", m.Cap("domain"), m.Cap("hostname")) } - return - } - if m.Target() == c { - m.Conf("hostname", arg[0]) - msg := m.Spawn().Cmd("send", "pwd", arg[0]) - m.Cap("hostname", msg.Result(0)) - m.Cap("domain", msg.Result(1)) - } else { - m.Spawn().Cmd("send", "pwd", arg[0]) - } - m.Echo(m.Cap("domain")) - return - }}, - "hello": &ctx.Command{Name: "hello request", Help: "加密请求", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { - aaa := m.Target().Message().Sess("aaa", false) - for _, k := range m.Meta["seal"] { - for i, v := range m.Meta[k] { - m.Meta[k][i] = m.Spawn(aaa).Cmd("deal", v).Result(0) - } - } - for _, k := range m.Meta["encrypt"] { - for i, v := range m.Meta[k] { - m.Meta[k][i] = m.Spawn(aaa).Cmd("decrypt", v).Result(0) - } - } + default: + names := strings.SplitN(arg[0], ".", 2) + if names[0] == "" { // 本地执行 + host := m.Confm("host", m.Option("hostname")) - if len(arg) == 0 { - if !m.Has("mi") { - cert := aaa.Spawn().Cmd("certificate") - m.Echo(cert.Result(0)) + msg := m.Find(kit.Format(host["cm_target"])).Cmd(arg[1:]) + m.Copy(msg, "append").Copy(msg, "result") + host["cm_target"] = msg.Cap("module") + + m.Back(m) + return + } + + m.Option("hostname", m.Cap("hostname")) + sync := !m.Options("remote_code") //同步或异步 + if arg[1] == "async" { + sync, arg = false, arg[2:] + } else if arg[1] == "sync" { + sync, arg = true, arg[2:] } else { - msg := m.Sess("aaa").Cmd("login", m.Option("mi"), m.Option("mi")) - m.Echo(msg.Result(0)) - msg.Sess("aaa").Cmd("newcipher", m.Option("mi")) + arg = arg[1:] } + + rest := kit.Select("", names, 1) + if names[0] == "*" { + m.Confm("host", func(name string, host map[string]interface{}) { + m.Find(kit.Format(host["module"]), true).Copy(m, "option").CallBack(sync, func(sub *ctx.Message) *ctx.Message { + return m.Copy(sub) + }, "send", "", arg) + }) + + } else if m.Confm("host", names[0], func(host map[string]interface{}) { + m.Find(kit.Format(host["module"]), true).Copy(m, "option").CallBack(sync, func(sub *ctx.Message) *ctx.Message { + return m.Copy(sub) + }, "send", rest, arg) + m.Log("fuck", "m %v", m.Meta) + + }) == nil { + m.Find(m.Cap("stream"), true).Copy(m, "option").CallBack(sync, func(sub *ctx.Message) *ctx.Message { + return m.Copy(sub) + }, "send", strings.Join(names, "."), arg) + } + } + return + }}, + "sh": &ctx.Command{Name: "sh [[host] name] cmd...", Help: "发送命令", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + if len(arg) == 0 { + m.Echo(m.Conf("current")) return } - msg := m.Spawn().Copy(m, "option").Cmd(arg) - m.Copy(msg, "result").Copy(msg, "append") + if arg[0] == "host" { + m.Conf("current", arg[1]) + arg = arg[2:] + } else if m.Confm("host", arg[0]) != nil { + m.Conf("current", arg[0]) + arg = arg[1:] + } + msg := m.Cmd("ssh.remote", m.Conf("current"), arg) + m.Copy(msg, "result") return }}, - "shake": &ctx.Command{ - Name: "shake [domain host] cmd... [seal option...][encrypt option...]", - Help: "加密通信", - Form: map[string]int{"seal": -1, "encrypt": -1}, - Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { - if ssh, ok := m.Target().Server.(*SSH); m.Assert(ok) { - if len(arg) == 0 { - for k, v := range ssh.peer { - m.Echo("%s: %s\n", k, v.Cap("stream")) - } - return - } - - peer := "peer" - args := []string{} - if len(arg) > 1 && arg[0] == "domain" { - args = append(args, "domain", arg[1]) - peer, arg = arg[1], arg[2:] - } - if ssh.peer == nil { - ssh.peer = map[string]*ctx.Message{} - } - user, ok := ssh.peer[peer] - if !ok { - user = m.Sess("aaa").Cmd("login", "cert", m.Spawn().Cmd("send", args, "hello"), peer) - ssh.peer[peer] = user - mi := user.Cap("sessid") - - remote := m.Spawn().Add("option", mi, m.Spawn(user).Cmd("seal", mi)).Add("option", "seal", mi).Cmd("send", args, "hello") - m.Spawn(user).Cmd("newcipher", mi) - user.Cap("remote", "remote", remote.Result(0), "远程会话") - user.Cap("remote_mi", "remote_mi", mi, "远程密钥") - } - - msg := m.Spawn(ssh.Message().Source()).Copy(m, "option") - msg.Option("hello", "world") - msg.Option("world", "hello") - for _, k := range msg.Meta["seal"] { - for i, v := range msg.Meta[k] { - msg.Meta[k][i] = msg.Spawn(user).Cmd("seal", v).Result(0) - } - } - for _, k := range msg.Meta["encrypt"] { - for i, v := range msg.Meta[k] { - msg.Meta[k][i] = msg.Spawn(user).Cmd("encrypt", v).Result(0) - } - } - msg.Detail("send", args, "hello", arg) - ssh.Message().Back(msg) - m.Copy(msg, "result").Copy(msg, "append") - } + "cp": &ctx.Command{Name: "cp [[host] name] filename", Help: "发送文件", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + if len(arg) == 0 { + m.Echo(m.Conf("current")) return - }}, - "save": &ctx.Command{Name: "save", Help: "远程执行", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { - json := m.Sess("nfs") - json.Put("option", "data", map[string]string{"domain": m.Cap("domain")}) - json.Cmd("json", m.Conf("domain.json")) - m.Sess("nfs").Cmd("genqr", m.Conf("domain.png"), json.Result(0)) + } + + if arg[0] == "host" { + m.Conf("current", arg[1]) + arg = arg[2:] + } else if m.Confm("host", arg[0]) != nil { + m.Conf("current", arg[0]) + arg = arg[1:] + } + + if arg[0] == "save" { + buf, e := base64.StdEncoding.DecodeString(m.Option("filebuf")) + m.Assert(e) + + f, e := os.OpenFile(path.Join("tmp", m.Option("filename")), os.O_RDWR|os.O_CREATE, 0666) + f.WriteAt(buf, int64(m.Optioni("filepos"))) + return e + } + + p := m.Cmdx("nfs.path", arg[0]) + f, e := os.Open(p) + m.Assert(e) + s, e := f.Stat() + m.Assert(e) + + buf := make([]byte, 1024) + + for i := int64(0); i < s.Size(); i += 1024 { + n, _ := f.ReadAt(buf, i) + if n == 0 { + break + } + + buf = buf[:n] + msg := m.Spawn() + msg.Option("filename", arg[0]) + msg.Option("filesize", s.Size()) + msg.Option("filepos", i) + msg.Option("filebuf", base64.StdEncoding.EncodeToString(buf)) + msg.Cmd("remote", m.Conf("current"), "cp", "save", arg[0]) + } return }}, }, diff --git a/src/contexts/tcp/tcp.go b/src/contexts/tcp/tcp.go index fa77f365..2b53d04b 100644 --- a/src/contexts/tcp/tcp.go +++ b/src/contexts/tcp/tcp.go @@ -79,8 +79,9 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { m.Log("info", "%s dial %s", m.Cap("nclient"), m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr()))) + m.Sess("tcp", m) m.Option("ms_source", tcp.Context.Name) - m.Put("option", "io", tcp).Back(m.Spawn(m.Source())) + m.Put("option", "io", tcp).Back() return false case "accept": @@ -91,8 +92,9 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { m.Log("info", "%s accept %s", m.Cap("nclient"), m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr()))) + m.Sess("tcp", m) m.Option("ms_source", tcp.Context.Name) - m.Put("option", "io", tcp).Back(m.Spawn(m.Source())) + m.Put("option", "io", tcp).Back() return false case "listen": @@ -118,8 +120,8 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { for { c, e := tcp.Accept() m.Assert(e) - m.Spawn(Index).Put("option", "io", c).Call(func(com *ctx.Message) *ctx.Message { - return com.Spawn(m.Source()) + m.Spawn(Index).Put("option", "io", c).Call(func(sub *ctx.Message) *ctx.Message { + return sub.Spawn(m.Source()) }, "accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol")) }