From f0d27ab53b966b62abc479ffe36cd5e70e381987 Mon Sep 17 00:00:00 2001 From: shaoying Date: Mon, 16 Apr 2018 09:10:01 +0800 Subject: [PATCH] =?UTF-8?q?vps=20mod=20nfs&tcp=20=E7=BD=91=E7=BB=9C?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=94=B9=E4=B8=BA=E5=BC=82=E6=AD=A5=E5=9B=9E?= =?UTF-8?q?=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/contexts/ctx.go | 24 +++++++++++++++--------- src/contexts/nfs/nfs.go | 28 ++++++++++++++++++++++++++-- src/contexts/tcp/tcp.go | 16 +++++++++++----- 3 files changed, 52 insertions(+), 16 deletions(-) diff --git a/src/contexts/ctx.go b/src/contexts/ctx.go index e0593acb..b796680c 100644 --- a/src/contexts/ctx.go +++ b/src/contexts/ctx.go @@ -296,7 +296,7 @@ type Message struct { Index int ncallback int - callback func() bool + callback func(ok bool) (done bool, up bool) Template *Message } @@ -1044,7 +1044,7 @@ func (m *Message) Exec(key string, arg ...string) string { // {{{ } m.target.Historys = append(m.target.Historys, m) - m.Back() + m.Back(false) }) return m.Get("result") @@ -1127,28 +1127,34 @@ func (m *Message) Cmd(arg ...interface{}) *Message { // {{{ } // }}} -func (m *Message) Call(cb func() bool) { // {{{ +func (m *Message) Call(cb func(ok bool) (done bool, up bool), cmd bool) *Message { // {{{ m.callback = cb m.message.ncallback++ m.Wait = nil - m.Cmd() + if cmd { + m.Cmd() + } + + return m } // }}} -func (m *Message) Back() { // {{{ +func (m *Message) Back(ok bool) *Message { // {{{ if m.callback == nil { - return + return m } - if m.callback() { + done, up := m.callback(ok) + if done { m.callback = nil m.message.ncallback-- } - if m.message.ncallback == 0 { - m.message.Back() + if up || m.message.ncallback == 0 { + m.message.Back(ok) } + return m } // }}} diff --git a/src/contexts/nfs/nfs.go b/src/contexts/nfs/nfs.go index 9f4888db..737fa584 100644 --- a/src/contexts/nfs/nfs.go +++ b/src/contexts/nfs/nfs.go @@ -308,10 +308,34 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心", }}, "listen": &ctx.Command{Name: "listen args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { - m.Cap("stream", m.Sess("tcp", "tcp").Cmd(m.Meta["detail"]).Cap("address")) + msg := m.Sess("tcp", "tcp") // {{{ + msg.Call(func(ok bool) (done bool, up bool) { + if ok { + sub := msg.Spawn(m.Target()) + sub.Put("option", "io", msg.Data["io"]) + sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...) + sub.Cap("stream", msg.Append("stream")) + sub.Echo(sub.Target().Name) + } + return false, true + }, false) + m.Cap("stream", msg.Cmd(m.Meta["detail"]).Cap("address")) + // }}} }}, "dial": &ctx.Command{Name: "dial args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { - m.Sess("tcp", "tcp").Cmd(m.Meta["detail"]) + msg := m.Sess("tcp", "tcp") // {{{ + msg.Call(func(ok bool) (done bool, up bool) { + if ok { + sub := msg.Spawn(m.Target()) + sub.Put("option", "io", msg.Data["io"]) + sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...) + sub.Cap("stream", msg.Append("stream")) + sub.Echo(sub.Target().Name) + return true, true + } + return false, false + }, false).Cmd(m.Meta["detail"]) + // }}} }}, "send": &ctx.Command{Name: "send [file] args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { // {{{ diff --git a/src/contexts/tcp/tcp.go b/src/contexts/tcp/tcp.go index 55027664..85ca0615 100644 --- a/src/contexts/tcp/tcp.go +++ b/src/contexts/tcp/tcp.go @@ -67,18 +67,18 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ tcp.Conn = c } - msg := m.Reply("open").Put("option", "io", tcp.Conn).Cmd("open") m.Log("info", nil, "%s dial %s", Pulse.Cap("nclient"), - msg.Cap("stream", m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr())))) + m.Append("stream", m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr())))) + m.Put("append", "io", tcp.Conn).Back(true) return false case "accept": c, e := m.Data["io"].(net.Conn) m.Assert(e) tcp.Conn = c - msg := m.Spawn(m.Data["source"].(*ctx.Context), "open").Put("option", "io", tcp.Conn).Cmd("open") m.Log("info", nil, "%s accept %s", Pulse.Cap("nclient"), - msg.Cap("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr())))) + m.Append("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr())))) + m.Put("append", "io", tcp.Conn).Back(true) return false default: if m.Cap("security") != "false" { @@ -102,7 +102,13 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ c, e := tcp.Accept() m.Assert(e) msg := m.Spawn(Index).Put("option", "io", c).Put("option", "source", m.Source()) - msg.Start(fmt.Sprintf("com%d", Pulse.Capi("nclient", 1)), "网络连接", + msg.Call(func(ok bool) (done bool, up bool) { + if ok { + m.Append("stream", msg.Append("stream")) + m.Put("append", "io", msg.Data["io"]) + } + return ok, ok + }, false).Start(fmt.Sprintf("com%d", Pulse.Capi("nclient", 1)), "网络连接", "accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol")) }