From a678a73efb8790454c0630506aa9ac65145629f2 Mon Sep 17 00:00:00 2001 From: shylinux Date: Fri, 20 Apr 2018 09:01:58 +0800 Subject: [PATCH] =?UTF-8?q?vps=20mod=20Message.Back=20=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=BA=86=E6=B6=88=E6=81=AF=E5=9B=9E=E8=B0=83=E7=9A=84=E4=BC=A0?= =?UTF-8?q?=E5=8F=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/contexts/ctx.go | 14 +++--- src/contexts/nfs/nfs.go | 65 +++++++++++++------------ src/contexts/ssh/ssh.go | 104 +++++++++------------------------------- src/contexts/tcp/tcp.go | 11 +++-- 4 files changed, 70 insertions(+), 124 deletions(-) diff --git a/src/contexts/ctx.go b/src/contexts/ctx.go index 931d19e1..cb564ee1 100644 --- a/src/contexts/ctx.go +++ b/src/contexts/ctx.go @@ -295,7 +295,7 @@ type Message struct { Index int ncallback int - callback func(ok bool) (done bool, up bool) + callback func(ok bool, sub *Message) (done bool, up *Message) Template *Message } @@ -1043,7 +1043,7 @@ func (m *Message) Exec(key string, arg ...string) string { // {{{ } m.target.Historys = append(m.target.Historys, m) - m.Back(false) + m.Back(false, nil) }) return m.Get("result") @@ -1126,7 +1126,7 @@ func (m *Message) Cmd(arg ...interface{}) *Message { // {{{ } // }}} -func (m *Message) Call(cb func(ok bool) (done bool, up bool), cmd bool) *Message { // {{{ +func (m *Message) Call(cb func(ok bool, msg *Message) (done bool, sub *Message), cmd bool) *Message { // {{{ m.callback = cb m.message.ncallback++ @@ -1139,20 +1139,20 @@ func (m *Message) Call(cb func(ok bool) (done bool, up bool), cmd bool) *Message } // }}} -func (m *Message) Back(ok bool) *Message { // {{{ +func (m *Message) Back(ok bool, msg *Message) *Message { // {{{ if m.callback == nil { return m } m.Log("info", nil, "back %v %v", m.Meta["result"], m.Meta["append"]) - done, up := m.callback(ok) + done, sub := m.callback(ok, msg) if done { m.callback = nil m.message.ncallback-- } - if up || m.message.ncallback == 0 { - m.message.Back(ok) + if sub != nil || m.message.ncallback == 0 { + m.message.Back(ok, sub) } return m } diff --git a/src/contexts/nfs/nfs.go b/src/contexts/nfs/nfs.go index afff45d1..99a18bf0 100644 --- a/src/contexts/nfs/nfs.go +++ b/src/contexts/nfs/nfs.go @@ -120,31 +120,34 @@ func (nfs *NFS) Start(m *ctx.Message, arg ...string) bool { // {{{ func() { fuck := msg - fuck.Call(func(ok bool) (done bool, up bool) { - target = fuck.Target() - m.Cap("target", target.Name) + fuck.Call(func(ok bool, cmd *ctx.Message) (bool, *ctx.Message) { + if ok { + target = fuck.Target() + m.Cap("target", target.Name) - for _, v := range fuck.Meta["result"] { - fmt.Fprintf(nfs.Writer, "result: %s\n", url.QueryEscape(v)) - } - - fmt.Fprintf(nfs.Writer, "nsend: %s\n", fuck.Get("nrecv")) - for _, k := range fuck.Meta["append"] { - for _, v := range fuck.Meta[k] { - fmt.Fprintf(nfs.Writer, "%s: %s\n", k, v) + for _, v := range fuck.Meta["result"] { + fmt.Fprintf(nfs.Writer, "result: %s\n", url.QueryEscape(v)) } - } - fmt.Fprintf(nfs.Writer, "\n") - nfs.Writer.Flush() - if fuck.Has("io") { - if f, ok := fuck.Data["io"].(io.ReadCloser); ok { - io.Copy(nfs.Writer, f) - nfs.Writer.Flush() - f.Close() + fmt.Fprintf(nfs.Writer, "nsend: %s\n", fuck.Get("nrecv")) + for _, k := range fuck.Meta["append"] { + for _, v := range fuck.Meta[k] { + fmt.Fprintf(nfs.Writer, "%s: %s\n", k, v) + } } + fmt.Fprintf(nfs.Writer, "\n") + nfs.Writer.Flush() + + if fuck.Has("io") { + if f, ok := fuck.Data["io"].(io.ReadCloser); ok { + io.Copy(nfs.Writer, f) + nfs.Writer.Flush() + f.Close() + } + } + return true, fuck } - return ok, ok + return false, nil }, false).Cmd(fuck.Meta["detail"]) }() @@ -316,32 +319,32 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心", "listen": &ctx.Command{Name: "listen args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { msg := m.Sess("pub", "tcp") // {{{ - msg.Call(func(ok bool) (done bool, up bool) { + msg.Call(func(ok bool, com *ctx.Message) (bool, *ctx.Message) { if ok { sub := msg.Spawn(m.Target()) sub.Put("option", "io", msg.Data["io"]) - sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...) - sub.Cap("stream", msg.Target().Name) - sub.Echo(sub.Target().Name) - m.Target(sub.Target()) + sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件") + + sub.Cap("stream", com.Target().Name) + return false, sub } - return false, true + return false, nil }, false).Cmd(m.Meta["detail"]) // }}} }}, "dial": &ctx.Command{Name: "dial args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { msg := m.Sess("com", "tcp") // {{{ - msg.Call(func(ok bool) (done bool, up bool) { + msg.Call(func(ok bool, com *ctx.Message) (bool, *ctx.Message) { if ok { sub := msg.Spawn(m.Target()) sub.Put("option", "io", msg.Data["io"]) - sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...) + sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件") + sub.Cap("stream", msg.Target().Name) - sub.Echo(sub.Target().Name) m.Target(sub.Target()) - return true, true + return true, sub } - return false, false + return false, nil }, false).Cmd(m.Meta["detail"]) // }}} }}, diff --git a/src/contexts/ssh/ssh.go b/src/contexts/ssh/ssh.go index 7ecbcbf6..be9fdf04 100644 --- a/src/contexts/ssh/ssh.go +++ b/src/contexts/ssh/ssh.go @@ -5,7 +5,6 @@ import ( // {{{ "contexts" "fmt" "net" - "net/url" "strings" ) @@ -21,13 +20,7 @@ type SSH struct { } func (ssh *SSH) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server { // {{{ - c.Caches = map[string]*ctx.Cache{ - "nsend": &ctx.Cache{Name: "消息发送数量", Value: "0", Help: "消息发送数量"}, - "nrecv": &ctx.Cache{Name: "消息接收数量", Value: "0", Help: "消息接收数量"}, - "target": &ctx.Cache{Name: "消息接收模块", Value: "ssh", Help: "消息接收模块"}, - "result": &ctx.Cache{Name: "前一条指令执行结果", Value: "", Help: "前一条指令执行结果"}, - "sessid": &ctx.Cache{Name: "会话令牌", Value: "", Help: "会话令牌"}, - } + c.Caches = map[string]*ctx.Cache{} c.Configs = map[string]*ctx.Config{} s := new(SSH) @@ -37,6 +30,7 @@ func (ssh *SSH) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server // }}} func (ssh *SSH) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ + ssh.Context.Master(nil) if ssh.Context == Index { Pulse = m } @@ -46,61 +40,6 @@ func (ssh *SSH) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ // }}} func (ssh *SSH) Start(m *ctx.Message, arg ...string) bool { // {{{ return false - - ssh.Group = "" - ssh.Owner = nil - ssh.Conn = m.Data["io"].(net.Conn) - ssh.Reader = bufio.NewReader(ssh.Conn) - ssh.Writer = bufio.NewWriter(ssh.Conn) - ssh.send = make(map[string]*ctx.Message) - m.Log("info", nil, "%s connect %v <-> %v", Pulse.Cap("nhost"), ssh.Conn.LocalAddr(), ssh.Conn.RemoteAddr()) - - target, msg := m.Target(), m.Spawn(m.Target()) - - for { - line, e := ssh.Reader.ReadString('\n') - m.Assert(e) - - if line = strings.TrimSpace(line); len(line) == 0 { - if msg.Log("info", nil, "remote: %v", msg.Meta["option"]); msg.Has("detail") { - msg.Log("info", nil, "%d exec: %v", m.Capi("nrecv", 1), msg.Meta["detail"]) - - msg.Cmd(msg.Meta["detail"]) - target = msg.Target() - m.Cap("target", target.Name) - - for _, v := range msg.Meta["result"] { - fmt.Fprintf(ssh.Writer, "result: %s\n", url.QueryEscape(v)) - } - - fmt.Fprintf(ssh.Writer, "nsend: %s\n", msg.Get("nrecv")) - for _, k := range msg.Meta["append"] { - for _, v := range msg.Meta[k] { - fmt.Fprintf(ssh.Writer, "%s: %s\n", k, v) - } - } - fmt.Fprintf(ssh.Writer, "\n") - ssh.Writer.Flush() - } else { - msg.Log("info", nil, "%s echo: %v", msg.Get("nsend"), msg.Meta["result"]) - - m.Cap("result", msg.Get("result")) - msg.Meta["append"] = msg.Meta["option"] - send := ssh.send[msg.Get("nsend")] - send.Meta = msg.Meta - send.Recv <- true - } - msg = m.Spawn(target) - continue - } - - ls := strings.SplitN(line, ":", 2) - ls[0] = strings.TrimSpace(ls[0]) - ls[1], e = url.QueryUnescape(strings.TrimSpace(ls[1])) - m.Assert(e) - msg.Add("option", ls[0], ls[1]) - } - return false } // }}} @@ -125,37 +64,38 @@ var Index = &ctx.Context{Name: "ssh", Help: "集群中心", }, Commands: map[string]*ctx.Command{ "listen": &ctx.Command{Name: "listen address protocol", Help: "监听连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { - msg := m.Sess("file", "nfs") - msg.Call(func(ok bool) (done bool, up bool) { + msg := m.Find("nfs") + msg.Call(func(ok bool, file *ctx.Message) (bool, *ctx.Message) { if ok { sub := msg.Spawn(m.Target()) - sub.Start(fmt.Sprintf("host%d", Pulse.Capi("nhost", 1)), "打开文件", sub.Meta["detail"]...) - sub.Cap("stream", msg.Target().Name) - sub.Target().Sessions["file"] = msg - sub.Echo(sub.Target().Name) - sub.Spawn(sub.Target()).Cmd("send", "context", "ssh") - sub.Spawn(sub.Target()).Cmd("send", "route", sub.Target().Name, msg.Cap("route")) + sub.Start(fmt.Sprintf("host%d", Pulse.Capi("nhost", 1)), "远程主机") + + sub.Cap("stream", file.Target().Name) + sub.Sess("file", "nfs."+file.Target().Name) + sub.Sess("file").Cmd("send", "context", "ssh") + sub.Sess("file").Cmd("send", "route", sub.Target().Name, msg.Cap("route")) + return false, sub } - return false, true + return false, nil }, false).Cmd(m.Meta["detail"]) }}, "dial": &ctx.Command{Name: "dial address protocol", Help: "建立连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { msg := m.Sess("file", "nfs") - msg.Call(func(ok bool) (done bool, up bool) { + msg.Call(func(ok bool, file *ctx.Message) (bool, *ctx.Message) { if ok { - m.Sess("file").Cmd("send", "context", "ssh") m.Cap("stream", msg.Target().Name) - return true, true + m.Sess("file").Cmd("send", "context", "ssh") + return true, m } - return false, false + return false, nil }, false).Cmd(m.Meta["detail"]) }}, "send": &ctx.Command{Name: "send arg...", Help: "打开连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { msg := m.Sess("file") - msg.Copy(m, "detail").Call(func(ok bool) (done bool, up bool) { - return ok, ok + msg.Copy(m, "detail").Call(func(ok bool, file *ctx.Message) (bool, *ctx.Message) { + return ok, file }, false).Cmd() m.Copy(msg, "result") }}, @@ -179,9 +119,9 @@ var Index = &ctx.Context{Name: "ssh", Help: "集群中心", m.Travel(m.Target(), func(m *ctx.Message) bool { if m.Target().Name == target[0] { msg := m.Spawn(m.Target()) - msg.Call(func(ok bool) (done bool, up bool) { + msg.Call(func(ok bool, host *ctx.Message) (bool, *ctx.Message) { m.Copy(msg, "result") - return ok, ok + return ok, host }, false).Cmd("send", "search", strings.Join(target[1:], "."), arg[1:]) miss = false @@ -191,9 +131,9 @@ var Index = &ctx.Context{Name: "ssh", Help: "集群中心", if miss { msg := m.Spawn(c) - msg.Call(func(ok bool) (done bool, up bool) { + msg.Call(func(ok bool, host *ctx.Message) (bool, *ctx.Message) { m.Copy(msg, "result") - return ok, ok + return ok, host }, false).Cmd("send", "search", arg) } } diff --git a/src/contexts/tcp/tcp.go b/src/contexts/tcp/tcp.go index 85ca0615..ddb665cb 100644 --- a/src/contexts/tcp/tcp.go +++ b/src/contexts/tcp/tcp.go @@ -69,7 +69,7 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ m.Log("info", nil, "%s dial %s", Pulse.Cap("nclient"), m.Append("stream", m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr())))) - m.Put("append", "io", tcp.Conn).Back(true) + m.Put("append", "io", tcp.Conn).Back(true, m) return false case "accept": c, e := m.Data["io"].(net.Conn) @@ -78,7 +78,8 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ m.Log("info", nil, "%s accept %s", Pulse.Cap("nclient"), m.Append("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr())))) - m.Put("append", "io", tcp.Conn).Back(true) + m.Put("append", "io", tcp.Conn).Back(true, m) + m.Log("fuck", nil, "accept") return false default: if m.Cap("security") != "false" { @@ -102,12 +103,14 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ c, e := tcp.Accept() m.Assert(e) msg := m.Spawn(Index).Put("option", "io", c).Put("option", "source", m.Source()) - msg.Call(func(ok bool) (done bool, up bool) { + msg.Call(func(ok bool, com *ctx.Message) (bool, *ctx.Message) { if ok { m.Append("stream", msg.Append("stream")) m.Put("append", "io", msg.Data["io"]) + com.Log("fuck", nil, "listen") } - return ok, ok + + return ok, com }, false).Start(fmt.Sprintf("com%d", Pulse.Capi("nclient", 1)), "网络连接", "accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol")) }