diff --git a/src/context/mdb/mdb.go b/src/context/mdb/mdb.go index 0b50ca8f..be0de87d 100644 --- a/src/context/mdb/mdb.go +++ b/src/context/mdb/mdb.go @@ -59,19 +59,27 @@ func (mdb *MDB) Start(m *ctx.Message, arg ...string) bool { // {{{ // }}} func (mdb *MDB) Close(m *ctx.Message, arg ...string) bool { // {{{ - if mdb.DB != nil && m.Target == mdb.Context { + if mdb.Context == Index { + return false + } + + switch mdb.Context { + case m.Target: + case m.Source: + } + + if mdb.DB != nil { m.Log("info", nil, "%d close %s %s", m.Capi("nsource", -1)+1, m.Cap("driver"), m.Cap("source")) mdb.DB.Close() mdb.DB = nil - return true } - return false + return true } // }}} -var Index = &ctx.Context{Name: "mdb", Help: "内存数据库", +var Index = &ctx.Context{Name: "mdb", Help: "数据中心", Caches: map[string]*ctx.Cache{ "nsource": &ctx.Cache{Name: "数据源数量", Value: "0", Help: "已打开数据库的数量"}, }, @@ -83,7 +91,7 @@ var Index = &ctx.Context{Name: "mdb", Help: "内存数据库", m.Assert(len(arg) > 2, "缺少参数") // {{{ m.Master, m.Target = c, c m.Cap("stream", m.Cap("nsource")) - m.Start(arg[0], arg[1], arg[2:]...) + m.Start(arg[0], "数据存储", arg[2:]...) return "" // }}} }}, diff --git a/src/context/tcp/tcp.go b/src/context/tcp/tcp.go index 3fc6855c..8f0912b6 100644 --- a/src/context/tcp/tcp.go +++ b/src/context/tcp/tcp.go @@ -20,14 +20,10 @@ func (tcp *TCP) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server c.Caches = map[string]*ctx.Cache{ "protocol": &ctx.Cache{Name: "protocol(tcp/tcp4/tcp6)", Value: m.Conf("protocol"), Help: "监听地址"}, "security": &ctx.Cache{Name: "security(true/false)", Value: m.Conf("security"), Help: "加密通信"}, - "address": &ctx.Cache{Name: "address", Value: arg[1], Help: "监听地址"}, + "address": &ctx.Cache{Name: "address", Value: "", Help: "监听地址"}, } c.Configs = map[string]*ctx.Config{} - if len(arg) > 2 { - m.Cap("security", arg[2]) - } - s := new(TCP) s.Context = c return s @@ -36,53 +32,65 @@ func (tcp *TCP) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server // }}} func (tcp *TCP) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ + if m.Target == Index { + Pulse = m + } + return tcp } // }}} func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ + if len(arg) > 1 { + m.Cap("address", arg[1]) + } + if len(arg) > 2 { + m.Cap("security", arg[2]) + } + switch arg[0] { case "dial": c, e := net.Dial(m.Cap("protocol"), m.Cap("address")) m.Assert(e) tcp.Conn = c + m.Log("info", nil, "dial(%d) %v->%v", m.Capi("nclient", 1), tcp.LocalAddr(), tcp.RemoteAddr()) + m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr())) - m.Log("info", nil, "dial(%d) %v->%v", m.Capi("nclient"), c.LocalAddr(), c.RemoteAddr()) // m.Reply(c.LocalAddr().String()).Put("option", "io", c).Cmd("open") return false case "accept": + c, e := m.Data["io"].(net.Conn) + m.Assert(e) + tcp.Conn = c + m.Log("info", nil, "accept(%d) %v<-%v", m.Capi("nclient", 1), tcp.LocalAddr(), tcp.RemoteAddr()) + m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr())) + + s, e := m.Data["source"].(*ctx.Context) + m.Assert(e) + msg := m.Spawn(s).Put("option", "io", c) + msg.Cmd("open") + msg.Cap("stream", tcp.RemoteAddr().String()) + + if tcp.Sessions == nil { + tcp.Sessions = make(map[string]*ctx.Message) + } + tcp.Sessions["open"] = msg + msg.Name = "open" + + // m.Reply(c.RemoteAddr().String()) return false } l, e := net.Listen(m.Cap("protocol"), m.Cap("address")) m.Assert(e) tcp.Listener = l - - m.Log("info", nil, "listen(%d) %v", m.Capi("nlisten"), l.Addr()) + m.Log("info", nil, "listen(%d) %v", m.Capi("nlisten", 1), l.Addr()) + m.Cap("stream", fmt.Sprintf("%s", l.Addr())) for { c, e := l.Accept() m.Assert(e) - - s, i := m.Target, 0 - m.BackTrace(func(m *ctx.Message) bool { - s = m.Target - if i++; i == 2 { - return false - } - return true - }) - - msg := m.Spawn(s) - msg.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), c.RemoteAddr().String(), "accept", c.RemoteAddr().String()) - msg.Log("info", nil, "accept(%d) %v<-%v", m.Capi("nclient"), c.LocalAddr(), c.RemoteAddr()) - - if tcp, ok := msg.Target.Server.(*TCP); ok { - tcp.Conn = c - } - rep := m.Reply(c.RemoteAddr().String()) - rep.Source = msg.Target - rep.Put("option", "io", c).Cmd("open") + m.Spawn(Index).Put("option", "io", c).Put("option", "source", m.Source).Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), "网络连接", "accept", c.RemoteAddr().String()) } return true @@ -90,22 +98,33 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ // }}} func (tcp *TCP) Close(m *ctx.Message, arg ...string) bool { // {{{ - if tcp.Listener != nil { - m.Log("info", nil, "close(%d) %v", m.Capi("nlisten", -1)+1, tcp.Listener.Addr()) - tcp.Listener.Close() - return true - } - if tcp.Conn != nil { - tcp.Conn.Close() - return true + if tcp.Context == Index { + return false } - return false + switch tcp.Context { + case m.Target: + case m.Source: + if tcp.Listener != nil { + return false + } + + } + + if tcp.Listener != nil { + m.Log("info", nil, "close(%d) %v", Pulse.Capi("nlisten", -1)+1, tcp.Listener.Addr()) + tcp.Listener.Close() + } + if tcp.Conn != nil { + m.Log("info", nil, "close %v", tcp.Conn.LocalAddr()) + tcp.Conn.Close() + } + return true } // }}} -var Index = &ctx.Context{Name: "tcp", Help: "网络连接", +var Index = &ctx.Context{Name: "tcp", Help: "网络中心", Caches: map[string]*ctx.Cache{ "nlisten": &ctx.Cache{Name: "nlisten", Value: "0", Help: "监听数量"}, "nclient": &ctx.Cache{Name: "nclient", Value: "0", Help: "连接数量"}, @@ -118,14 +137,14 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", "listen": &ctx.Command{Name: "listen [address [security]]", Help: "监听连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) string { switch len(arg) { // {{{ case 0: - m.Travel(m.Target, func(m *ctx.Message) bool { + m.Travel(nil, func(m *ctx.Message) bool { if tcp, ok := m.Target.Server.(*TCP); ok && tcp.Listener != nil { m.Echo("%s %v\n", m.Target.Name, tcp.Addr()) } return true }) default: - m.Start(fmt.Sprintf("pub%d", m.Capi("nlisten", 1)), arg[0], m.Meta["detail"]...) + m.Start(fmt.Sprintf("pub%d", m.Capi("nlisten")+1), "网络监听", m.Meta["detail"]...) } return "" // }}} @@ -133,14 +152,14 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", "dial": &ctx.Command{Name: "dial [address [security]]", Help: "建立连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) string { switch len(arg) { // {{{ case 0: - m.Travel(m.Target, func(m *ctx.Message) bool { + m.Travel(nil, func(m *ctx.Message) bool { if tcp, ok := m.Target.Server.(*TCP); ok && tcp.Conn != nil { m.Echo("%s %v<->%v\n", m.Target.Name, tcp.LocalAddr(), tcp.RemoteAddr()) } return true }) default: - m.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), arg[0], m.Meta["detail"]...) + m.Start(fmt.Sprintf("com%d", m.Capi("nclient")+1), "网络连接", m.Meta["detail"]...) } return "" // }}} @@ -153,9 +172,9 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", // }}} }}, "recv": &ctx.Command{Name: "recv size", Help: "接收消息", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) string { + size, e := strconv.Atoi(arg[0]) + m.Assert(e) if tcp, ok := m.Target.Server.(*TCP); ok && tcp.Conn != nil { // {{{ - size, e := strconv.Atoi(arg[0]) - m.Assert(e) buf := make([]byte, size) tcp.Conn.Read(buf) return string(buf) @@ -174,6 +193,8 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", }, } +var Pulse *ctx.Message + func init() { tcp := &TCP{} tcp.Context = Index