diff --git a/src/contexts/ctx.go b/src/contexts/ctx.go index 1aca8448..3678be1a 100644 --- a/src/contexts/ctx.go +++ b/src/contexts/ctx.go @@ -280,6 +280,8 @@ type Message struct { messages []*Message message *Message root *Message + + Remote chan bool } func (m *Message) Code() int { // {{{ @@ -654,20 +656,14 @@ func (m *Message) CallBack(sync bool, cb func(msg *Message) (sub *Message), arg return m.Call(cb, arg...) } - wait := make(chan bool) - + wait := make(chan *Message) go m.Call(func(sub *Message) *Message { msg := cb(sub) - m.Log("lock", "before done %v", arg) - wait <- true - m.Log("lock", "after done %v", arg) + wait <- m return msg }, arg...) - m.Log("lock", "before wait %v", arg) - <-wait - m.Log("lock", "after wait %v", arg) - return m + return <-wait } // }}} @@ -2887,10 +2883,12 @@ func Start(args ...string) { m.target.Begin(m) } + Pulse.Sess("tcp", "tcp") Pulse.Sess("nfs", "nfs") Pulse.Sess("lex", "lex") Pulse.Sess("yac", "yac") Pulse.Sess("cli", "cli") + Pulse.Sess("aaa", "aaa") Pulse.Sess("log", "log") diff --git a/src/contexts/nfs/nfs.go b/src/contexts/nfs/nfs.go index 627f401b..dc5dce44 100644 --- a/src/contexts/nfs/nfs.go +++ b/src/contexts/nfs/nfs.go @@ -37,9 +37,12 @@ type NFS struct { paths []string io net.Conn + send chan *ctx.Message + recv chan *ctx.Message + hand map[int]*ctx.Message + *bufio.Reader *bufio.Writer - send map[int]*ctx.Message target *ctx.Context cli *ctx.Message @@ -546,12 +549,10 @@ func (nfs *NFS) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server c.Configs = map[string]*ctx.Config{} } else { c.Caches = map[string]*ctx.Cache{ - "nbytes": &ctx.Cache{Name: "消息发送字节", Value: "0", Help: "消息发送字节"}, "nsend": &ctx.Cache{Name: "消息发送数量", Value: "0", Help: "消息发送数量"}, "nrecv": &ctx.Cache{Name: "消息接收数量", Value: "0", Help: "消息接收数量"}, - "target": &ctx.Cache{Name: "消息接收模块", Value: "ssh", Help: "消息接收模块"}, - "result": &ctx.Cache{Name: "前一条指令执行结果", Value: "", Help: "前一条指令执行结果"}, - "sessid": &ctx.Cache{Name: "会话令牌", Value: "", Help: "会话令牌"}, + "nread": &ctx.Cache{Name: "nread", Value: "0", Help: "nread"}, + "nwrite": &ctx.Cache{Name: "nwrite", Value: "0", Help: "nwrite"}, } c.Configs = map[string]*ctx.Config{} } @@ -673,143 +674,97 @@ func (nfs *NFS) Start(m *ctx.Message, arg ...string) bool { // {{{ return false } + m.Cap("stream", m.Option("stream")) nfs.io = m.Optionv("io").(net.Conn) - bio := bufio.NewScanner(nfs.io) - for bio.Scan() { - m.Log("info", "recv: %s", bio.Text()) - } - - return false - m.Sess("nfs", m) - - nfs.Message = m - if _, ok := m.Data["io"]; ok { - m.Cap("stream", m.Source().Name) - // m.Sess("aaa", "aaa").Cmd("login", "demo", "demo") - m.Options("stdio", false) - - // nfs.io = socket.(io.ReadWriteCloser) - nfs.Reader = bufio.NewReader(nfs.io) - nfs.Writer = bufio.NewWriter(nfs.io) - - nfs.send = make(map[int]*ctx.Message) - nfs.target = m.Target() - if target, ok := m.Data["target"]; ok { - nfs.target = target.(*ctx.Context) - } - - var msg *ctx.Message - - nfs.Caches["target"] = &ctx.Cache{Name: "target", Value: "", Help: "文件名"} - - nsend := "" + nfs.hand = map[int]*ctx.Message{} + nfs.send = make(chan *ctx.Message, 10) + nfs.recv = make(chan *ctx.Message, 10) + go func() { for { - line, e := nfs.Reader.ReadString('\n') - if msg == nil { - msg = m.Sess("ssh") - m.Cap("target", msg.Target().Name) - } - - if e == io.EOF { - msg.Cmd("close") - } - m.Assert(e) - - if line = strings.TrimSpace(line); len(line) > 0 { - ls := strings.SplitN(line, ":", 2) - ls[0] = strings.TrimSpace(ls[0]) - ls[1], e = url.QueryUnescape(strings.TrimSpace(ls[1])) - m.Assert(e) - - switch ls[0] { - case "detail": - msg.Add("detail", ls[1]) - case "result": - msg.Add("result", ls[1]) - case "nsend": - nsend = ls[1] - default: - msg.Add("option", ls[0], ls[1]) + select { + case msg := <-nfs.send: + head, body := "detail", "option" + if msg.Hand { + head, body = "result", "append" + send_code := msg.Option("send_code") + msg.Append("send_code", send_code) + m.Log("info", "%s recv: %v %v", msg.Option("recv_code"), msg.Meta[head], msg.Meta[body]) + } else { + m.Log("info", "%d send: %v %v", m.Capi("nsend", 1), msg.Meta[head], msg.Meta[body]) + msg.Meta["detail"] = msg.Meta["detail"][1:] + nfs.hand[m.Capi("nsend")] = msg + msg.Option("send_code", m.Capi("nsend")) } - continue - } - if msg.Has("detail") { - msg.Log("info", "%d recv", m.Capi("nrecv", 1)) - msg.Log("info", "detail: %v", msg.Meta["detail"]) - msg.Log("info", "option: %v", msg.Meta["option"]) - msg.Options("stdio", false) - msg.Option("nsend", nsend) - - func() { - cmd := msg - nsends := nsend - cmd.Call(func(sub *ctx.Message) *ctx.Message { - for _, v := range sub.Meta["result"] { - _, e := fmt.Fprintf(nfs.Writer, "result: %s\n", url.QueryEscape(v)) - sub.Assert(e) - } - - sub.Append("nsend", nsends) - for _, k := range sub.Meta["append"] { - for _, v := range sub.Meta[k] { - _, e := fmt.Fprintf(nfs.Writer, "%s: %s\n", k, url.QueryEscape(v)) - sub.Assert(e) - } - } - - sub.Log("info", "%d recv", sub.Optioni("nsend")) - sub.Log("info", "result: %v", sub.Meta["result"]) - sub.Log("info", "append: %v", sub.Meta["append"]) - - _, e := fmt.Fprintf(nfs.Writer, "\n") - sub.Assert(e) - e = nfs.Writer.Flush() - sub.Assert(e) - - if sub.Has("io") { - if f, ok := sub.Data["io"].(io.ReadCloser); ok { - io.Copy(nfs.Writer, f) - nfs.Writer.Flush() - f.Close() - } - } - return sub - }) - }() - - } else { - msg.Meta["append"] = msg.Meta["option"] - delete(msg.Meta, "option") - - msg.Log("info", "%s send", nsend) - msg.Log("info", "result: %v", msg.Meta["result"]) - msg.Log("info", "append: %v", msg.Meta["append"]) - - n, e := strconv.Atoi(nsend) - m.Assert(e) - send := nfs.send[n] - send.Copy(msg, "result") - send.Copy(msg, "append") - - if send.Has("io") { - if f, ok := send.Data["io"].(io.WriteCloser); ok { - io.CopyN(f, nfs.Reader, int64(send.Appendi("size"))) - f.Close() + for _, v := range msg.Meta[head] { + n, e := fmt.Fprintf(nfs.io, "%s: %s\n", head, url.QueryEscape(v)) + m.Assert(e) + m.Capi("nwrite", n) + } + for _, k := range msg.Meta[body] { + for _, v := range msg.Meta[k] { + n, e := fmt.Fprintf(nfs.io, "%s: %s\n", url.QueryEscape(k), url.QueryEscape(v)) + m.Assert(e) + m.Capi("nwrite", n) } } - send.Back(send) + n, e := fmt.Fprintf(nfs.io, "\n") + m.Assert(e) + m.Capi("nwrite", n) + } + } + }() + + go func() { + bio := bufio.NewScanner(nfs.io) + var e error + for msg, head, body := m.Spawn(), "", ""; bio.Scan(); { + line := bio.Text() + m.Capi("nread", len(line)+1) + if len(line) == 0 { + + if head == "detail" { + m.Log("info", "%d recv: %v %v", m.Capi("nrecv", 1), msg.Meta[head], msg.Meta[body]) + msg.Option("recv_code", m.Cap("nrecv")) + nfs.recv <- msg + } else { + m.Log("info", "%d send: %v %v", msg.Appendi("send_code"), msg.Meta[head], msg.Meta[body]) + h := nfs.hand[msg.Appendi("send_code")] + h.Copy(msg, "result").Copy(msg, "append") + h.Remote <- true + } + msg = m.Spawn() + continue } - nsend = "" - msg = nil + word := strings.Split(line, ": ") + word[0], e = url.QueryUnescape(word[0]) + m.Assert(e) + word[1], e = url.QueryUnescape(word[1]) + m.Assert(e) + switch word[0] { + case "detail": + head, body = "detail", "option" + msg.Add(word[0], word[1]) + case "result": + head, body = "result", "append" + msg.Add(word[0], word[1]) + default: + msg.Add(body, word[0], word[1]) + } + } + }() + + for { + select { + case msg := <-nfs.recv: + nfs.send <- msg.Cmd() } - return true } - return false + return true } // }}} @@ -1319,7 +1274,7 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心", "listen": &ctx.Command{Name: "listen args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { if _, ok := m.Target().Server.(*NFS); m.Assert(ok) { //{{{ - m.Find("tcp").Call(func(com *ctx.Message) *ctx.Message { + m.Sess("tcp").Call(func(com *ctx.Message) *ctx.Message { sub := com.Spawn(c) sub.Start(fmt.Sprintf("file%d", m.Capi("nfile", 1)), "远程文件") return sub @@ -1329,7 +1284,7 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心", }}, "dial": &ctx.Command{Name: "dial args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { if _, ok := m.Target().Server.(*NFS); m.Assert(ok) { //{{{ - m.Find("tcp").Call(func(com *ctx.Message) *ctx.Message { + m.Sess("tcp").Call(func(com *ctx.Message) *ctx.Message { sub := com.Spawn(c) sub.Start(fmt.Sprintf("file%d", m.Capi("nfile", 1)), "远程文件") return sub @@ -1338,111 +1293,10 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心", // }}} }}, "send": &ctx.Command{Name: "send [file] args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { - if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { // {{{ - m.Log("fuck", "%v %v", arg, nfs.io) - nfs.io.Write([]byte(arg[0])) - return - if m.Has("nrecv") { - if len(arg) > 1 && arg[0] == "file" { - info, e := os.Stat(arg[1]) - m.Assert(e) - m.Append("name", info.Name()) - m.Append("size", info.Size()) - m.Append("time", info.ModTime()) - m.Append("mode", info.Mode()) - - f, e := os.Open(arg[1]) - m.Assert(e) - m.Put("append", "io", f) - } - - } else { - nfs.send[m.Optioni("nsend", m.Capi("nsend", 1))] = m - - if len(arg) > 1 && arg[0] == "file" { - info, e := os.Stat(arg[1]) - m.Assert(e) - m.Option("name", info.Name()) - m.Option("size", info.Size()) - m.Option("time", info.ModTime()) - m.Option("mode", info.Mode()) - - n, e := fmt.Fprintf(nfs.Writer, "detail: recv\n") - m.Capi("nbytes", n) - m.Assert(e) - } - for _, v := range arg { - n, e := fmt.Fprintf(nfs.Writer, "detail: %v\n", v) - m.Capi("nbytes", n) - m.Assert(e) - } - - for _, k := range m.Meta["option"] { - if k == "args" { - continue - } - for _, v := range m.Meta[k] { - n, e := fmt.Fprintf(nfs.Writer, "%s: %s\n", k, v) - m.Capi("nbytes", n) - m.Assert(e) - } - } - m.Log("info", "%d send", m.Optioni("nsend")) - m.Log("info", "detail: %v", m.Meta["detail"]) - m.Log("info", "option: %v", m.Meta["option"]) - - n, e := fmt.Fprintf(nfs.Writer, "\n") - m.Capi("nbytes", n) - m.Assert(e) - nfs.Writer.Flush() - - if len(arg) > 1 && arg[0] == "file" { - f, e := os.Open(arg[1]) - m.Assert(e) - defer f.Close() - _, e = io.Copy(nfs.Writer, f) - } - } - } // }}} - }}, - "recv": &ctx.Command{Name: "recv [file] args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { - if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { // {{{ - if m.Has("nrecv") { - if len(arg) > 1 && arg[0] == "file" { - f, e := os.Create(arg[1]) - m.Assert(e) - defer f.Close() - io.CopyN(f, nfs.Reader, int64(m.Optioni("size"))) - } - - return - } - - nfs.send[m.Optioni("nrecv", m.Capi("nsend", 1))] = m - - if len(arg) > 1 && arg[0] == "file" { - f, e := os.Create(arg[1]) - m.Assert(e) - m.Put("option", "io", f) - - fmt.Fprintf(nfs.Writer, "detail: send\n") - } - - for _, v := range arg { - fmt.Fprintf(nfs.Writer, "detail: %v\n", v) - } - - for _, k := range m.Meta["option"] { - if k == "args" { - continue - } - for _, v := range m.Meta[k] { - fmt.Fprintf(nfs.Writer, "%s: %s\n", k, v) - } - } - - fmt.Fprintf(nfs.Writer, "\n") - nfs.Writer.Flush() + if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) && nfs.io != nil { // {{{ + m.Remote = make(chan bool, 1) + nfs.send <- m + <-m.Remote } // }}} }}, }, diff --git a/src/contexts/tcp/tcp.go b/src/contexts/tcp/tcp.go index a76216ad..c37efcba 100644 --- a/src/contexts/tcp/tcp.go +++ b/src/contexts/tcp/tcp.go @@ -70,7 +70,7 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ tcp.Conn = c m.Log("info", "%s accept %s", m.Cap("nclient"), - m.Append("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr())))) + m.Option("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr())))) m.Put("option", "io", tcp.Conn).Back(m) return false default: