diff --git a/src/context/mdb/mdb.go b/src/context/mdb/mdb.go new file mode 100644 index 00000000..6586148c --- /dev/null +++ b/src/context/mdb/mdb.go @@ -0,0 +1,175 @@ +package mdb // {{{ +// }}} +import ( // {{{ + "context" + + "database/sql" + _ "github.com/go-sql-driver/mysql" + + "errors" + "fmt" + "log" +) + +// }}} + +type MDB struct { + db *sql.DB + *ctx.Context +} + +func (mdb *MDB) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ + mdb.Configs["source"] = &ctx.Config{Name: "source", Value: "", Help: "数据库参数"} + mdb.Configs["driver"] = &ctx.Config{Name: "driver", Value: "", Help: "数据库驱动"} + + return mdb +} + +// }}} +func (mdb *MDB) Start(m *ctx.Message, arg ...string) bool { // {{{ + mdb.Capi("nsource", 1) + defer mdb.Capi("nsource", -1) + + if len(arg) > 0 { + mdb.Conf("source", arg[0]) + + if len(arg) > 1 { + mdb.Conf("driver", arg[1]) + } + } + + if mdb.Conf("source") == "" || mdb.Conf("driver") == "" { + return true + } + + db, e := sql.Open(mdb.Conf("driver"), mdb.Conf("source")) + mdb.Assert(e) + mdb.db = db + defer mdb.db.Close() + + log.Println(mdb.Name, "open:", mdb.Conf("driver"), mdb.Conf("source")) + defer log.Println(mdb.Name, "close:", mdb.Conf("driver"), mdb.Conf("source")) + + for _, p := range m.Meta["prepare"] { + _, e := db.Exec(p) + mdb.Assert(e) + } + + return true +} + +// }}} +func (mdb *MDB) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server { // {{{ + c.Caches = map[string]*ctx.Cache{} + c.Configs = map[string]*ctx.Config{} + + s := new(MDB) + s.Context = c + return s +} + +// }}} +func (mdb *MDB) Exit(m *ctx.Message, arg ...string) bool { // {{{ + return true +} + +// }}} + +var Index = &ctx.Context{Name: "mdb", Help: "内存数据库", + Caches: map[string]*ctx.Cache{ + "nsource": &ctx.Cache{Name: "数据源数量", Value: "0", Help: "数据库连接的数量"}, + }, + Configs: map[string]*ctx.Config{}, + Commands: map[string]*ctx.Command{ + "open": &ctx.Command{Name: "open [source [driver]]", Help: "打开数据库", + Options: map[string]string{ + "prepare": "打开数据库时自动执行的语句", + }, + Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + m.Start("db"+c.Cap("nsource"), arg...) // {{{ + return "" + // }}} + }}, + "exec": &ctx.Command{Name: "exec sql [arg]", Help: "执行SQL语句", + Appends: map[string]string{ + "LastInsertId": "最后插入元组的标识", + "RowsAffected": "修改元组的数量", + }, + Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + mdb, ok := m.Target.Server.(*MDB) // {{{ + if !ok { + m.Assert(errors.New("目标模块类型错误")) + } + if len(arg) == 0 { + m.Assert(errors.New("缺少参数")) + } + + which := make([]interface{}, 0, len(arg)) + for _, v := range arg[1:] { + which = append(which, v) + } + + ret, e := mdb.db.Exec(arg[0], which...) + m.Assert(e) + + id, e := ret.LastInsertId() + m.Assert(e) + n, e := ret.RowsAffected() + m.Assert(e) + + m.Add("append", "LastInsertId", fmt.Sprintf("%d", id)) + m.Add("append", "RowsAffected", fmt.Sprintf("%d", n)) + return "" + // }}} + }}, + "query": &ctx.Command{Name: "query sql [arg]", Help: "执行SQL语句", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + mdb, ok := m.Target.Server.(*MDB) // {{{ + if !ok { + m.Assert(errors.New("目标模块类型错误")) + } + if len(arg) == 0 { + m.Assert(errors.New("缺少参数")) + } + + which := make([]interface{}, 0, len(arg)) + for _, v := range arg[1:] { + which = append(which, v) + } + + rows, e := mdb.db.Query(arg[0], which...) + m.Assert(e) + defer rows.Close() + + cols, e := rows.Columns() + m.Assert(e) + num := len(cols) + + for rows.Next() { + vals := make([]interface{}, num) + ptrs := make([]interface{}, num) + for i := range vals { + ptrs[i] = &vals[i] + } + rows.Scan(ptrs...) + + for i, k := range cols { + switch b := vals[i].(type) { + case []byte: + m.Add("append", k, string(b)) + case int64: + m.Add("append", k, fmt.Sprintf("%d", b)) + } + } + } + + return "" + // }}} + }}, + }, +} + +func init() { + mdb := &MDB{} + mdb.Context = Index + ctx.Index.Register(Index, mdb) +} diff --git a/src/context/mdb/mdb_test.go b/src/context/mdb/mdb_test.go new file mode 100644 index 00000000..4029ff2f --- /dev/null +++ b/src/context/mdb/mdb_test.go @@ -0,0 +1,61 @@ +package mdb + +import ( + "context" + "flag" + "log" + "os" + "testing" +) + +func TestOpen(t *testing.T) { + flag.Parse() + args := flag.Args() + if len(args) < 2 { + t.Fatal("usages: -args source driver [table]") + } + + source := "user:word@/book" + driver := "mysql" + source = args[0] + driver = args[1] + + //mysql -u root -p; + //create database book; + //grant all on book.* to user identified by 'word' + + ctx.Start() + ctx.Index.Conf("debug", "off") + log.SetOutput(os.Stdout) + m := ctx.Pulse.Spawn(Index) + + m.Meta = nil + m.Cmd("open", source, driver) + + m.Meta = nil + m.Cmd("exec", "insert into program(time, hash, name) values(?, ?, ?)", "1", "2", "3") + + m.Meta = nil + m.Cmd("exec", "insert into program(time, hash, name) values(?, ?, ?)", "1", "2", "3") + + m.Meta = nil + m.Cmd("exec", "insert into program(time, hash, name) values(?, ?, ?)", "2", "3", "4") + + m.Meta = nil + m.Cmd("query", "select time, hash, name from program") + + t.Log() + for i, rows := 0, len(m.Meta[m.Meta["append"][0]]); i < rows; i++ { + for _, k := range m.Meta["append"] { + t.Log(k, m.Meta[k][i]) + } + t.Log() + } + + if len(m.Meta["append"]) != 3 || len(m.Meta[m.Meta["append"][0]]) != 2 { + t.Error() + } + + m.Meta = nil + // Index.Exit(m) +} diff --git a/src/context/tcp/tcp.go b/src/context/tcp/tcp.go new file mode 100644 index 00000000..f32ff9f2 --- /dev/null +++ b/src/context/tcp/tcp.go @@ -0,0 +1,135 @@ +package tcp // {{{ +// }}} +import ( // {{{ + "context" + "log" + "net" +) + +// }}} + +type TCP struct { + listener net.Listener + *ctx.Context +} + +func (tcp *TCP) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{ + tcp.Caches["nclient"] = &ctx.Cache{Name: "nclient", Value: "0", Help: "连接数量"} + return tcp +} + +// }}} +func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{ + if tcp.Conf("address") == "" { + return true + } + + l, e := net.Listen("tcp4", tcp.Conf("address")) + tcp.Assert(e) + tcp.listener = l + + log.Printf("%s listen(%d): %v", tcp.Name, tcp.Capi("nlisten", 1), l.Addr()) + defer tcp.Capi("nlisten", -1) + defer log.Println("%s close(%d): %v", tcp.Name, tcp.Capi("nlisten", 0), l.Addr()) + + for { + c, e := l.Accept() + tcp.Assert(e) + log.Printf("%s accept(%d): %v<-%v", tcp.Name, tcp.Capi("nclient", 1), c.LocalAddr(), c.RemoteAddr()) + // defer log.Println(tcp.Name, "close:", tcp.Capi("nclient", -1), c.LocalAddr(), "<-", c.RemoteAddr()) + + msg := m.Spawn(m.Context, c.RemoteAddr().String()).Put("option", "io", c) + msg.Cmd("open", c.RemoteAddr().String(), "tcp") + } + + return true +} + +// }}} +func (tcp *TCP) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server { // {{{ + c.Caches = map[string]*ctx.Cache{} + c.Configs = map[string]*ctx.Config{ + "address": &ctx.Config{Name: "address", Value: arg[0], Help: "监听地址"}, + } + + s := new(TCP) + s.Context = c + return s + +} + +// }}} +func (tcp *TCP) Exit(m *ctx.Message, arg ...string) bool { // {{{ + + if c, ok := m.Data["result"].(net.Conn); ok && m.Target == tcp.Context { + c.Close() + delete(m.Data, "result") + return true + } + + if c, ok := m.Data["detail"].(net.Conn); ok && m.Context == tcp.Context { + c.Close() + delete(m.Data, "detail") + return true + } + return true +} + +// }}} + +var Index = &ctx.Context{Name: "tcp", Help: "网络连接", + Caches: map[string]*ctx.Cache{ + "nlisten": &ctx.Cache{Name: "nlisten", Value: "0", Help: "连接数量"}, + }, + Configs: map[string]*ctx.Config{ + "address": &ctx.Config{Name: "address", Value: "", Help: "监听地址"}, + }, + Commands: map[string]*ctx.Command{ + "listen": &ctx.Command{Name: "listen address", Help: "监听连接", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + switch len(arg) { // {{{ + case 0: + for k, s := range m.Target.Contexts { + m.Echo("%s %s\n", k, s.Server.(*TCP).listener.Addr().String()) + } + case 1: + go m.Start(arg[0], arg[0]) + } + return "" + // }}} + }}, + "dial": &ctx.Command{Name: "dial", Help: "建立连接", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + tcp := c.Server.(*TCP) // {{{ + switch len(arg) { + case 0: + for i, v := range tcp.Requests { + conn := v.Data["result"].(net.Conn) + m.Echo(tcp.Name, "conn: %s %s -> %s\n", i, conn.LocalAddr(), conn.RemoteAddr()) + } + case 2: + conn, e := net.Dial("tcp", arg[0]) + c.Assert(e) + log.Println(tcp.Name, "dial:", conn.LocalAddr(), "->", conn.RemoteAddr()) + } + return "" + // }}} + }}, + "exit": &ctx.Command{Name: "exit", Help: "退出", Hand: func(c *ctx.Context, m *ctx.Message, key string, arg ...string) string { + tcp, ok := m.Target.Server.(*TCP) // {{{ + if !ok { + tcp, ok = m.Context.Server.(*TCP) + } + if ok { + tcp.Context.Exit(m) + } + + return "" + // }}} + }}, + }, +} + +func init() { + tcp := &TCP{} + tcp.Context = Index + ctx.Index.Register(Index, tcp) +} diff --git a/src/context/tcp/tcp_test.go b/src/context/tcp/tcp_test.go new file mode 100644 index 00000000..3695d9e3 --- /dev/null +++ b/src/context/tcp/tcp_test.go @@ -0,0 +1,29 @@ +package tcp + +import ( + "context" + "flag" + "testing" + "time" +) + +func TestOpen(t *testing.T) { + flag.Parse() + args := flag.Args() + if len(args) < 1 { + t.Fatal("usages: -args address") + } + + address := ":9393" + address = args[0] + + //mysql -u root -p; + //create database book; + //grant all on book.* to user identified by 'word' + + ctx.Start() + m := ctx.Pulse.Spawn(Index) + + m.Meta = nil + Index.Cmd(m, "listen", address) +}