From 69d0e3d63aefa064ec02a28dae698eb29c7eecc8 Mon Sep 17 00:00:00 2001 From: shaoying Date: Fri, 1 Dec 2017 04:59:21 +0800 Subject: [PATCH] =?UTF-8?q?mac=20syn=200.1.0=20=E5=90=8C=E6=AD=A5=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/context/mdb/mdb.go | 137 ++++++++++++++++++--------------- src/context/tcp/tcp.go | 169 +++++++++++++++++++++-------------------- 2 files changed, 159 insertions(+), 147 deletions(-) diff --git a/src/context/mdb/mdb.go b/src/context/mdb/mdb.go index 7954369e..642469d2 100644 --- a/src/context/mdb/mdb.go +++ b/src/context/mdb/mdb.go @@ -6,50 +6,30 @@ import ( // {{{ "database/sql" _ "github.com/go-sql-driver/mysql" - "errors" "fmt" ) // }}} type MDB struct { - db *sql.DB + *sql.DB *ctx.Context } -func (mdb *MDB) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ - return mdb -} - -// }}} -func (mdb *MDB) Start(m *ctx.Message, arg ...string) bool { // {{{ - if m.Conf("source") == "" || m.Conf("driver") == "" { - return true - } - - db, e := sql.Open(m.Conf("driver"), m.Conf("source")) - m.Assert(e) - mdb.db = db - - m.Log("info", "%s: open %s %s", mdb.Name, m.Conf("driver"), m.Conf("source")) - m.Capi("nsource", 1) - - return true -} - -// }}} func (mdb *MDB) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server { // {{{ - c.Caches = map[string]*ctx.Cache{} - c.Configs = map[string]*ctx.Config{ - "source": &ctx.Config{Name: "source", Value: "", Help: "数据库参数"}, - "driver": &ctx.Config{Name: "driver", Value: "", Help: "数据库驱动"}, + c.Caches = map[string]*ctx.Cache{ + "source": &ctx.Cache{Name: "数据库参数", Value: "", Help: "数据库参数"}, + "driver": &ctx.Cache{Name: "数据库驱动", Value: "", Help: "数据库驱动"}, } + c.Configs = map[string]*ctx.Config{} if len(arg) > 0 { - m.Conf("source", arg[0]) - if len(arg) > 1 { - m.Conf("driver", arg[1]) - } + m.Cap("source", arg[0]) + } + if len(arg) > 1 { + m.Cap("driver", arg[1]) + } else { + m.Cap("driver", m.Conf("driver")) } s := new(MDB) @@ -58,52 +38,76 @@ func (mdb *MDB) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server } // }}} -func (mdb *MDB) Exit(m *ctx.Message, arg ...string) bool { // {{{ - if mdb.db != nil && m.Target == mdb.Context { - m.Log("info", "%s: close %s %s", mdb.Name, m.Conf("driver"), m.Conf("source")) - m.Capi("nsource", -1) - mdb.db.Close() - mdb.db = nil +func (mdb *MDB) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ + return mdb +} + +// }}} +func (mdb *MDB) Start(m *ctx.Message, arg ...string) bool { // {{{ + if len(arg) > 0 { + m.Cap("source", arg[0]) + } + if len(arg) > 1 { + m.Cap("driver", arg[1]) + } else { + m.Cap("driver", m.Conf("driver")) } - return true + if m.Cap("source") == "" || m.Cap("driver") == "" { + return false + } + + db, e := sql.Open(m.Cap("driver"), m.Cap("source")) + m.Assert(e) + mdb.DB = db + + m.Log("info", "%s: %d open %s %s", mdb.Name, m.Capi("nsource", 1), m.Cap("driver"), m.Cap("source")) + return false +} + +// }}} +func (mdb *MDB) Close(m *ctx.Message, arg ...string) bool { // {{{ + if mdb.DB != nil && m.Target == mdb.Context { + m.Log("info", "%s: %d close %s %s", mdb.Name, m.Capi("nsource", -1)+1, m.Cap("driver"), m.Cap("source")) + mdb.DB.Close() + mdb.DB = nil + return true + } + + return false } // }}} var Index = &ctx.Context{Name: "mdb", Help: "内存数据库", Caches: map[string]*ctx.Cache{ - "nsource": &ctx.Cache{Name: "数据源数量", Value: "0", Help: "数据库连接的数量"}, + "nsource": &ctx.Cache{Name: "数据源数量", Value: "0", Help: "已打开数据库的数量"}, + }, + Configs: map[string]*ctx.Config{ + "driver": &ctx.Config{Name: "数据库驱动(mysql)", Value: "mysql", Help: "数据库驱动"}, }, - Configs: map[string]*ctx.Config{}, Commands: map[string]*ctx.Command{ - "open": &ctx.Command{Name: "open name [source [driver]]", Help: "打开数据库", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { - m.Start(arg[0], arg[1:]...) // {{{ + "open": &ctx.Command{Name: "open name help [source [driver]]", Help: "打开数据库", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + m.Target = m.Master // {{{ + m.Start(arg[0], arg[1], arg[2:]...) return "" // }}} }}, - "exec": &ctx.Command{Name: "exec sql [arg]", Help: "执行SQL语句", - Appends: map[string]string{ - "LastInsertId": "最后插入元组的标识", - "RowsAffected": "修改元组的数量", - }, + "exec": &ctx.Command{Name: "exec sql [arg]", Help: "执行操作语句", + Appends: map[string]string{"LastInsertId": "最后插入元组的标识", "RowsAffected": "修改元组的数量"}, Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { mdb, ok := m.Target.Server.(*MDB) // {{{ - if !ok { - m.Assert(errors.New("目标模块类型错误")) - } - if len(arg) == 0 { - m.Assert(errors.New("缺少参数")) - } + m.Assert(ok, "目标模块类型错误") + m.Assert(len(arg) > 0, "缺少参数") + m.Assert(mdb.DB != nil, "数据库未打开") which := make([]interface{}, 0, len(arg)) for _, v := range arg[1:] { which = append(which, v) } - ret, e := mdb.db.Exec(arg[0], which...) + ret, e := mdb.Exec(arg[0], which...) m.Assert(e) - id, e := ret.LastInsertId() m.Assert(e) n, e := ret.RowsAffected() @@ -111,24 +115,22 @@ var Index = &ctx.Context{Name: "mdb", Help: "内存数据库", m.Add("append", "LastInsertId", fmt.Sprintf("%d", id)) m.Add("append", "RowsAffected", fmt.Sprintf("%d", n)) + m.Log("info", "%s: last(%d) rows(%d)", m.Target.Name, id, n) return "" // }}} }}, - "query": &ctx.Command{Name: "query sql [arg]", Help: "执行SQL语句", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + "query": &ctx.Command{Name: "query sql [arg]", Help: "执行查询语句", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { mdb, ok := m.Target.Server.(*MDB) // {{{ - if !ok { - m.Assert(errors.New("目标模块类型错误")) - } - if len(arg) == 0 { - m.Assert(errors.New("缺少参数")) - } + m.Assert(ok, "目标模块类型错误") + m.Assert(len(arg) > 0, "缺少参数") + m.Assert(mdb.DB != nil, "数据库未打开") which := make([]interface{}, 0, len(arg)) for _, v := range arg[1:] { which = append(which, v) } - rows, e := mdb.db.Query(arg[0], which...) + rows, e := mdb.Query(arg[0], which...) m.Assert(e) defer rows.Close() @@ -150,10 +152,19 @@ var Index = &ctx.Context{Name: "mdb", Help: "内存数据库", m.Add("append", k, string(b)) case int64: m.Add("append", k, fmt.Sprintf("%d", b)) + default: + m.Add("append", k, "") } } } + m.Log("info", "%s: cols(%d) rows(%d)", m.Target.Name, len(m.Meta["append"]), len(m.Meta[m.Meta["append"][0]])) + return "" + // }}} + }}, + "close": &ctx.Command{Name: "close name", Help: "关闭数据库", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + msg := m.Find(arg[0], m.Master) // {{{ + msg.Target.Close(msg) return "" // }}} }}, diff --git a/src/context/tcp/tcp.go b/src/context/tcp/tcp.go index 313be38a..2b9b2187 100644 --- a/src/context/tcp/tcp.go +++ b/src/context/tcp/tcp.go @@ -2,69 +2,30 @@ package tcp // {{{ // }}} import ( // {{{ "context" - "log" + + "fmt" "net" + "strconv" ) // }}} type TCP struct { - l net.Listener - c net.Conn - close bool + net.Conn + net.Listener *ctx.Context } -func (tcp *TCP) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ - return tcp -} - -// }}} -func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ - if arg[0] == "dial" { - c, e := net.Dial(m.Conf("protocol"), m.Conf("address")) - m.Assert(e) - tcp.c = c - - log.Printf("%s dial(%d): %v->%v", tcp.Name, m.Capi("nclient", 1), c.LocalAddr(), c.RemoteAddr()) - m.Reply(c.LocalAddr().String()).Put("option", "io", c).Cmd("open") - return true - } - - l, e := net.Listen(m.Conf("protocol"), m.Conf("address")) - m.Assert(e) - tcp.l = l - - log.Printf("%s listen(%d): %v", tcp.Name, m.Capi("nlisten", 1), l.Addr()) - defer m.Capi("nlisten", -1) - defer log.Println("%s close(%d): %v", tcp.Name, m.Capi("nlisten"), l.Addr()) - - for { - c, e := l.Accept() - m.Assert(e) - log.Printf("%s accept(%d): %v<-%v", tcp.Name, m.Capi("nclient", 1), c.LocalAddr(), c.RemoteAddr()) - m.Reply(c.RemoteAddr().String()).Put("option", "io", c).Cmd("open") - } - - return true -} - -// }}} func (tcp *TCP) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server { // {{{ - c.Caches = map[string]*ctx.Cache{} + c.Caches = map[string]*ctx.Cache{ + "protocol": &ctx.Cache{Name: "protocol(tcp/tcp4/tcp6)", Value: m.Conf("protocol"), Help: "监听地址"}, + "security": &ctx.Cache{Name: "security(true/false)", Value: m.Conf("security"), Help: "加密通信"}, + "address": &ctx.Cache{Name: "address", Value: arg[1], Help: "监听地址"}, + } c.Configs = map[string]*ctx.Config{} - if len(arg) > 1 { - switch arg[0] { - case "listen": - c.Caches["nclient"] = &ctx.Cache{Name: "nclient", Value: "0", Help: "连接数量"} - c.Configs["address"] = &ctx.Config{Name: "address", Value: arg[1], Help: "监听地址"} - case "dial": - c.Configs["address"] = &ctx.Config{Name: "address", Value: arg[1], Help: "连接地址"} - } - } if len(arg) > 2 { - c.Configs["security"] = &ctx.Config{Name: "security(true/false)", Value: "true", Help: "加密通信"} + m.Cap("security", arg[2]) } s := new(TCP) @@ -74,32 +35,65 @@ func (tcp *TCP) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server } // }}} -func (tcp *TCP) Exit(m *ctx.Message, arg ...string) bool { // {{{ - switch tcp.Context { - case m.Source: - c, ok := m.Data["io"].(net.Conn) - if !ok { - c = tcp.c - } - if c != nil { - log.Println(tcp.Name, "close:", c.LocalAddr(), "--", c.RemoteAddr()) - c.Close() - } +func (tcp *TCP) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ + return tcp +} - case m.Target: - if tcp.l != nil { - log.Println(tcp.Name, "close:", tcp.l.Addr()) - tcp.l.Close() - } - if tcp.c != nil { - log.Println(tcp.Name, "close:", tcp.c.LocalAddr(), "->", tcp.c.RemoteAddr()) - tcp.c.Close() +// }}} +func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ + switch arg[0] { + case "dial": + c, e := net.Dial(m.Cap("protocol"), m.Cap("address")) + m.Assert(e) + tcp.Conn = c + + m.Log("info", "%s: dial(%d) %v->%v", tcp.Name, m.Capi("nclient"), c.LocalAddr(), c.RemoteAddr()) + // m.Reply(c.LocalAddr().String()).Put("option", "io", c).Cmd("open") + return false + case "accept": + return false + } + + l, e := net.Listen(m.Cap("protocol"), m.Cap("address")) + m.Assert(e) + tcp.Listener = l + + m.Log("info", "%s: listen(%d) %v", tcp.Name, m.Capi("nlisten"), l.Addr()) + + for { + c, e := l.Accept() + m.Assert(e) + + msg := m.Spawn(tcp.Context.Context) + msg.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), c.RemoteAddr().String(), "accept", c.RemoteAddr().String()) + msg.Log("info", "%s: accept(%d) %v<-%v", tcp.Name, m.Capi("nclient"), c.LocalAddr(), c.RemoteAddr()) + + if tcp, ok := msg.Target.Server.(*TCP); ok { + tcp.Conn = c } + rep := m.Reply(c.RemoteAddr().String()) + rep.Source = msg.Target + rep.Put("option", "io", c).Cmd("open") } return true } +// }}} +func (tcp *TCP) Close(m *ctx.Message, arg ...string) bool { // {{{ + if tcp.Listener != nil { + m.Log("info", "%s: close(%d) %v", tcp.Name, m.Capi("nlisten", -1)+1, tcp.Listener.Addr()) + tcp.Listener.Close() + return true + } + if tcp.Conn != nil { + tcp.Conn.Close() + return true + } + + return false +} + // }}} var Index = &ctx.Context{Name: "tcp", Help: "网络连接", @@ -116,13 +110,13 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", switch len(arg) { // {{{ case 0: m.Travel(m.Target, func(m *ctx.Message) bool { - if tcp, ok := m.Target.Server.(*TCP); ok && tcp.l != nil { - m.Echo("%s %v\n", m.Target.Name, tcp.l.Addr()) + if tcp, ok := m.Target.Server.(*TCP); ok && tcp.Listener != nil { + m.Echo("%s %v\n", m.Target.Name, tcp.Addr()) } return true }) - case 1: - go m.Start(arg[0], m.Meta["detail"]...) + default: + m.Start(fmt.Sprintf("pub%d", m.Capi("nlisten", 1)), arg[0], m.Meta["detail"]...) } return "" // }}} @@ -131,26 +125,32 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", switch len(arg) { // {{{ case 0: m.Travel(m.Target, func(m *ctx.Message) bool { - if tcp, ok := m.Target.Server.(*TCP); ok && tcp.c != nil { - m.Echo("%s %v->%v\n", m.Target.Name, tcp.c.LocalAddr(), tcp.c.RemoteAddr()) + if tcp, ok := m.Target.Server.(*TCP); ok && tcp.Conn != nil { + m.Echo("%s %v<->%v\n", m.Target.Name, tcp.LocalAddr(), tcp.RemoteAddr()) } return true }) - case 1: - m.Start(arg[0], m.Meta["detail"]...) + default: + m.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), arg[0], m.Meta["detail"]...) } return "" // }}} }}, - "exit": &ctx.Command{Name: "exit", Help: "退出", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { - tcp, ok := m.Target.Server.(*TCP) // {{{ - if !ok { - tcp, ok = m.Source.Server.(*TCP) + "send": &ctx.Command{Name: "send message", Help: "发送消息", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + if tcp, ok := m.Target.Server.(*TCP); ok && tcp.Conn != nil { // {{{ + tcp.Conn.Write([]byte(arg[0])) } - if ok { - tcp.Context.Exit(m) + return "" + // }}} + }}, + "recv": &ctx.Command{Name: "recv size", Help: "接收消息", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + if tcp, ok := m.Target.Server.(*TCP); ok && tcp.Conn != nil { // {{{ + size, e := strconv.Atoi(arg[0]) + m.Assert(e) + buf := make([]byte, size) + tcp.Conn.Read(buf) + return string(buf) } - return "" // }}} }}, @@ -159,6 +159,7 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", "void": &ctx.Context{ Commands: map[string]*ctx.Command{ "listen": &ctx.Command{}, + "dial": &ctx.Command{}, }, }, },