1
0
forked from x/ContextOS

vps mod nfs&tcp 网络连接改为异步回调

This commit is contained in:
shaoying 2018-04-16 09:10:01 +08:00
parent b515ac2471
commit f0d27ab53b
3 changed files with 52 additions and 16 deletions

View File

@ -296,7 +296,7 @@ type Message struct {
Index int
ncallback int
callback func() bool
callback func(ok bool) (done bool, up bool)
Template *Message
}
@ -1044,7 +1044,7 @@ func (m *Message) Exec(key string, arg ...string) string { // {{{
}
m.target.Historys = append(m.target.Historys, m)
m.Back()
m.Back(false)
})
return m.Get("result")
@ -1127,28 +1127,34 @@ func (m *Message) Cmd(arg ...interface{}) *Message { // {{{
}
// }}}
func (m *Message) Call(cb func() bool) { // {{{
func (m *Message) Call(cb func(ok bool) (done bool, up bool), cmd bool) *Message { // {{{
m.callback = cb
m.message.ncallback++
m.Wait = nil
m.Cmd()
if cmd {
m.Cmd()
}
return m
}
// }}}
func (m *Message) Back() { // {{{
func (m *Message) Back(ok bool) *Message { // {{{
if m.callback == nil {
return
return m
}
if m.callback() {
done, up := m.callback(ok)
if done {
m.callback = nil
m.message.ncallback--
}
if m.message.ncallback == 0 {
m.message.Back()
if up || m.message.ncallback == 0 {
m.message.Back(ok)
}
return m
}
// }}}

View File

@ -308,10 +308,34 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心",
}},
"listen": &ctx.Command{Name: "listen args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
m.Cap("stream", m.Sess("tcp", "tcp").Cmd(m.Meta["detail"]).Cap("address"))
msg := m.Sess("tcp", "tcp") // {{{
msg.Call(func(ok bool) (done bool, up bool) {
if ok {
sub := msg.Spawn(m.Target())
sub.Put("option", "io", msg.Data["io"])
sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...)
sub.Cap("stream", msg.Append("stream"))
sub.Echo(sub.Target().Name)
}
return false, true
}, false)
m.Cap("stream", msg.Cmd(m.Meta["detail"]).Cap("address"))
// }}}
}},
"dial": &ctx.Command{Name: "dial args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
m.Sess("tcp", "tcp").Cmd(m.Meta["detail"])
msg := m.Sess("tcp", "tcp") // {{{
msg.Call(func(ok bool) (done bool, up bool) {
if ok {
sub := msg.Spawn(m.Target())
sub.Put("option", "io", msg.Data["io"])
sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...)
sub.Cap("stream", msg.Append("stream"))
sub.Echo(sub.Target().Name)
return true, true
}
return false, false
}, false).Cmd(m.Meta["detail"])
// }}}
}},
"send": &ctx.Command{Name: "send [file] args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { // {{{

View File

@ -67,18 +67,18 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{
tcp.Conn = c
}
msg := m.Reply("open").Put("option", "io", tcp.Conn).Cmd("open")
m.Log("info", nil, "%s dial %s", Pulse.Cap("nclient"),
msg.Cap("stream", m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr()))))
m.Append("stream", m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr()))))
m.Put("append", "io", tcp.Conn).Back(true)
return false
case "accept":
c, e := m.Data["io"].(net.Conn)
m.Assert(e)
tcp.Conn = c
msg := m.Spawn(m.Data["source"].(*ctx.Context), "open").Put("option", "io", tcp.Conn).Cmd("open")
m.Log("info", nil, "%s accept %s", Pulse.Cap("nclient"),
msg.Cap("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr()))))
m.Append("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr()))))
m.Put("append", "io", tcp.Conn).Back(true)
return false
default:
if m.Cap("security") != "false" {
@ -102,7 +102,13 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{
c, e := tcp.Accept()
m.Assert(e)
msg := m.Spawn(Index).Put("option", "io", c).Put("option", "source", m.Source())
msg.Start(fmt.Sprintf("com%d", Pulse.Capi("nclient", 1)), "网络连接",
msg.Call(func(ok bool) (done bool, up bool) {
if ok {
m.Append("stream", msg.Append("stream"))
m.Put("append", "io", msg.Data["io"])
}
return ok, ok
}, false).Start(fmt.Sprintf("com%d", Pulse.Capi("nclient", 1)), "网络连接",
"accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol"))
}