diff --git a/src/context/cli/cli.go b/src/context/cli/cli.go index ce478e51..63b72f54 100644 --- a/src/context/cli/cli.go +++ b/src/context/cli/cli.go @@ -1,14 +1,11 @@ -package cli // {{{ -// }}} +package cli + import ( // {{{ "bufio" "context" "fmt" "io" - "log" "os" - "os/exec" - "runtime/debug" "strconv" "strings" "time" @@ -26,8 +23,8 @@ type CLI struct { history []map[string]string alias map[string]string - exit chan bool next string + exit bool target *ctx.Context *ctx.Context @@ -58,6 +55,7 @@ func (cli *CLI) parse() bool { // {{{ l := len(cli.ins) if l == 1 { cli.echo("\n%s\n", cli.Conf("结束语")) + return false ls = "exit" e = nil } else { @@ -125,8 +123,7 @@ func (cli *CLI) parse() bool { // {{{ } } - log.Printf("%d spawn: %s->%s %v", cli.Resource[0].Code, msg.Context.Name, msg.Target.Name, msg.Meta["detail"]) - cli.Post(msg) + msg.Post(cli.Context) for _, v := range msg.Meta["result"] { cli.echo(v) @@ -135,53 +132,6 @@ func (cli *CLI) parse() bool { // {{{ return true } -// }}} -func (cli *CLI) deal(msg *ctx.Message) bool { // {{{ - defer func() { - if e := recover(); e != nil { - msg.Echo("%s\n", e) - - if e == io.EOF { - panic(e) - } else { - debug.PrintStack() - log.Println(e) - msg.End(false) - } - } - }() - - detail := msg.Meta["detail"] - if len(detail) == 0 { - msg.End(true) - return true - } - if a, ok := cli.alias[detail[0]]; ok { - detail[0] = a - } - - if _, ok := cli.Commands[detail[0]]; ok { - cli.next = cli.Cmd(msg, detail...) - } else if _, ok := msg.Target.Commands[detail[0]]; ok { - cli.next = msg.Cmd() - } else { - cmd := exec.Command(detail[0], detail[1:]...) - v, e := cmd.CombinedOutput() - if e != nil { - msg.Echo("%s\n", e) - } - msg.Echo(string(v)) - } - msg.End(true) - - cli.history = append(cli.history, map[string]string{ - "time": time.Now().Format("15:04:05"), - "index": fmt.Sprintf("%d", len(cli.history)), - "cli": strings.Join(detail, " "), - }) - return true -} - // }}} func (cli *CLI) echo(str string, arg ...interface{}) { // {{{ if len(cli.ins) == 1 || cli.Conf("slient") != "yes" { @@ -191,18 +141,15 @@ func (cli *CLI) echo(str string, arg ...interface{}) { // {{{ // }}} -func (cli *CLI) Begin() ctx.Server { // {{{ +func (cli *CLI) Begin(m *ctx.Message) ctx.Server { // {{{ cli.history = make([]map[string]string, 0, 100) cli.alias = make(map[string]string, 10) - cli.exit = make(chan bool) - cli.target = cli.Context return cli.Server } // }}} func (cli *CLI) Start(m *ctx.Message) bool { // {{{ - if detail, ok := m.Data["detail"]; ok { io := detail.(io.ReadWriteCloser) cli.out = io @@ -214,21 +161,40 @@ func (cli *CLI) Start(m *ctx.Message) bool { // {{{ cli.push(f) } - go func() { - defer recover() + go cli.Safe(m, func(c *ctx.Context, m *ctx.Message) { for cli.parse() { } - }() + }) } - for cli.deal(cli.Get()) { + for cli.Deal(func(msg *ctx.Message) bool { + arg := msg.Meta["detail"] + if a, ok := cli.alias[arg[0]]; ok { + arg[0] = a + } + return true + + }, func(msg *ctx.Message) bool { + arg := msg.Meta["detail"] + cli.history = append(cli.history, map[string]string{ + "time": time.Now().Format("15:04:05"), + "index": fmt.Sprintf("%d", len(cli.history)), + "cli": strings.Join(arg, " "), + }) + + if cli.exit == true { + return false + } + return true + + }) { } return true } // }}} -func (cli *CLI) Spawn(c *ctx.Context, arg ...string) ctx.Server { // {{{ +func (cli *CLI) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server { // {{{ c.Caches = map[string]*ctx.Cache{ "status": &ctx.Cache{Name: "status", Value: "stop", Help: "服务状态"}, } @@ -244,6 +210,14 @@ func (cli *CLI) Spawn(c *ctx.Context, arg ...string) ctx.Server { // {{{ return s } +// }}} +func (cli *CLI) Exit(m *ctx.Message, arg ...string) bool { // {{{ + if cli.Context != Index { + delete(cli.Context.Context.Contexts, cli.Name) + } + return true +} + // }}} var Index = &ctx.Context{Name: "cli", Help: "本地控制", @@ -303,18 +277,19 @@ var Index = &ctx.Context{Name: "cli", Help: "本地控制", msg.Target = msg.Context } default: - if cs := msg.Target.Find(strings.Split(arg[1], ".")); cs != nil { + // if cs := msg.Target.Find(strings.Split(arg[1], ".")); cs != nil { + if cs := c.Root.Search(arg[1]); cs != nil && len(cs) > 0 { if ok { - cli.target = cs + cli.target = cs[0] } else { - msg.Target = cs + msg.Target = cs[0] } } } case 3, 4: switch arg[1] { case "spawn": - msg.Target.Spawn(arg[2]) + msg.Target.Spawn(msg, arg[2]) case "find": cs := msg.Target.Find(strings.Split(arg[2], ".")) if cs != nil { @@ -505,7 +480,19 @@ var Index = &ctx.Context{Name: "cli", Help: "本地控制", // }}} }}, "exit": &ctx.Command{"exit", "退出", func(c *ctx.Context, m *ctx.Message, arg ...string) string { - panic(io.EOF) + cli, ok := m.Target.Server.(*CLI) + if !ok { + cli, ok = m.Context.Server.(*CLI) + } + if ok { + if !cli.exit { + m.Echo(c.Conf("结束语")) + cli.Context.Exit(m) + } + cli.exit = true + } + + return "" }}, "remote": &ctx.Command{"remote master|slave listen|dial address protocol", "建立远程连接", func(c *ctx.Context, m *ctx.Message, arg ...string) string { switch len(arg) { // {{{ @@ -518,7 +505,7 @@ var Index = &ctx.Context{Name: "cli", Help: "本地控制", } else { if arg[2] == "listen" { s := c.Root.Find(strings.Split(arg[4], ".")) - m.Message.Spawn(s, arg[3], 0).Add("detail", "listen", arg[3]).Post(c) + m.Message.Spawn(s, arg[3]).Cmd("listen", arg[3]) } else { } } diff --git a/src/context/ctx.go b/src/context/ctx.go index cd8ad71d..096eab71 100644 --- a/src/context/ctx.go +++ b/src/context/ctx.go @@ -1,10 +1,12 @@ -package ctx // {{{ -// }}} +package ctx + import ( // {{{ "errors" "fmt" + "io" "log" "os" + "os/exec" "regexp" "runtime/debug" "strconv" @@ -46,24 +48,25 @@ type Message struct { // {{{ Data map[string]interface{} Wait chan bool + Name string *Context - Name string - Target *Context Index int + Target *Context Messages []*Message Message *Message Root *Message } -func (m *Message) Spawn(c *Context, key string, index int) *Message { // {{{ +func (m *Message) Spawn(c *Context, key string) *Message { // {{{ msg := &Message{ Code: m.Capi("nmessage", 1), Time: time.Now(), - Target: c, Name: key, + Index: 0, + Target: c, Message: m, - Root: m.Root, + Root: Pulse, } msg.Context = m.Target @@ -82,6 +85,7 @@ func (m *Message) Spawn(c *Context, key string, index int) *Message { // {{{ } // }}} + func (m *Message) Add(key string, value ...string) *Message { // {{{ if m.Meta == nil { m.Meta = make(map[string][]string) @@ -132,8 +136,7 @@ func (m *Message) Echo(str string, arg ...interface{}) *Message { // {{{ m.Meta["result"] = make([]string, 0, 3) } - s := fmt.Sprintf(str, arg...) - m.Meta["result"] = append(m.Meta["result"], s) + m.Meta["result"] = append(m.Meta["result"], fmt.Sprintf(str, arg...)) return m } @@ -146,12 +149,25 @@ func (m *Message) End(s bool) { // {{{ } // }}} -func (m *Message) Cmd() string { // {{{ + +func (m *Message) Cmd(arg ...string) string { // {{{ + if len(arg) > 0 { + if m.Meta == nil { + m.Meta = make(map[string][]string) + } + m.Meta["detail"] = arg + } + + log.Printf("%s command(%s->%s): %v", m.Target.Name, m.Context.Name, m.Target.Name, m.Meta["detail"]) return m.Target.Cmd(m, m.Meta["detail"]...) } // }}} -func (m *Message) Post(c *Context) bool { // {{{ +func (m *Message) Post(c *Context, arg ...string) bool { // {{{ + if len(arg) > 0 { + m.Meta["detail"] = arg + } + if c.Messages == nil { c.Messages = make(chan *Message, c.Confi("MessageQueueSize")) } @@ -166,22 +182,26 @@ func (m *Message) Post(c *Context) bool { // {{{ // }}} func (m *Message) Start(arg ...string) bool { // {{{ - s := m.Target.Spawn(arg...).Begin() - m.Target = s - go s.Start(m) + if len(arg) > 0 { + m.Meta["detail"] = arg + } + + go m.Target.Spawn(m, m.Meta["detail"]...).Begin(m).Start(m) + return true } // }}} // }}} type Server interface { // {{{ - Begin() Server + Begin(m *Message) Server Start(m *Message) bool - Spawn(c *Context, arg ...string) Server + Spawn(c *Context, m *Message, arg ...string) Server + Exit(m *Message, arg ...string) bool } // }}} -type Context struct { // {{{ +type Context struct { Name string Help string @@ -207,78 +227,182 @@ type Context struct { // {{{ func (c *Context) Check(e error) bool { // {{{ if e != nil { log.Println(c.Name, "error:", e) - if c.Conf("debug") == "on" { - debug.PrintStack() - } panic(e) } return true } // }}} -func (c *Context) Register(s *Context, self Server) bool { // {{{ +func (c *Context) Safe(m *Message, hand ...func(c *Context, m *Message)) (ok bool) { // {{{ + defer func() { + if e := recover(); e != nil { + if c.Conf("debug") == "on" { + log.Println(c.Name, "error:", e) + if e != io.EOF { + debug.PrintStack() + } + } + if e == io.EOF { + return + } + + if len(hand) > 0 { + c.Safe(m, hand[1:]...) + } + } + Pulse.Wait <- true + }() + + if len(hand) > 0 { + hand[0](c, m) + } + + return true +} + +// }}} + +func (c *Context) Register(s *Context, x Server) *Context { // {{{ if c.Contexts == nil { c.Contexts = make(map[string]*Context) } - if x, ok := c.Contexts[s.Name]; ok { panic(errors.New(c.Name + " 上下文已存在" + x.Name)) } c.Contexts[s.Name] = s - s.Context = c s.Root = c.Root - s.Server = self + s.Context = c + s.Server = x + + log.Printf("%s register(%d): %s", c.Name, Index.Capi("ncontext", 1), s.Name) + return s +} + +// }}} +func (c *Context) Begin(m *Message) *Context { // {{{ + for _, v := range c.Configs { + if v.Hand != nil { + v.Hand(c, v.Value) + log.Println(c.Name, "config:", v.Name, v.Value) + } + } + + if c.Server != nil { + c.Server.Begin(m) + } + return c +} + +// }}} +func (c *Context) Start(m *Message) bool { // {{{ + if c.Server != nil && c.Cap("status") != "start" { + c.Safe(m, func(c *Context, m *Message) { + c.Cap("status", "start") + defer c.Cap("status", "stop") + + log.Printf("%d start(%d): %s", m.Code, Index.Capi("nserver", 1), c.Name) + defer Index.Capi("nserver", -1) + defer log.Printf("%d stop(%d): %s %s", m.Code, Index.Capi("nserver", 0), c.Name, c.Help) + + c.Resource = []*Message{m} + c.Server.Start(m) + }) + + } - log.Println(c.Name, "register:", s.Name) return true } // }}} -func (c *Context) Init(arg ...string) { // {{{ - if c.Root != nil { - return - } - - root := c - for root.Context != nil { - root = root.Context - } - - cs := []*Context{root} - - for i := 0; i < len(cs); i++ { - cs[i].Root = root - cs[i].Begin() - - for _, v := range cs[i].Contexts { - cs = append(cs, v) - } - } - - Pulse.Root = Pulse - Pulse.Target = Index - Pulse.Context = Index - - for _, s := range root.Contexts { - if ok, _ := regexp.MatchString(root.Conf("start"), s.Name); ok { - go s.Start(Pulse.Spawn(s, s.Name, 0).Put("detail", os.Stdout)) - } - } - - Pulse.Wait = make(chan bool) - for { - <-Pulse.Wait - if Index.Capi("nserver", 0) == 0 { - return - } - } +func (c *Context) Spawn(m *Message, arg ...string) *Context { // {{{ + s := &Context{Name: arg[0], Help: c.Help} + m.Target = s + c.Register(s, c.Server.Spawn(s, m, arg...)).Begin(m) + return s } // }}} -func (c *Context) Find(name []string) (s *Context) { // {{{ - log.Println(c.Name, "find:", name) +func (c *Context) Deal(pre func(m *Message) bool, post func(m *Message) bool) (live bool) { // {{{ + if c.Messages == nil { + c.Messages = make(chan *Message, c.Confi("MessageQueueSize")) + } + m := <-c.Messages + defer m.End(true) + + if len(m.Meta["detail"]) == 0 { + return true + } + + if pre != nil && !pre(m) { + return false + } + + c.Safe(m, func(c *Context, m *Message) { + c.Cmd(m, m.Meta["detail"]...) + log.Printf("%s command(%s->%s): %v", c.Name, m.Context.Name, m.Target.Name, m.Meta["detail"]) + + }, func(c *Context, m *Message) { + m.Cmd() + + }, func(c *Context, m *Message) { + log.Printf("system command(%s->%s): %v", m.Context.Name, m.Target.Name, m.Meta["detail"]) + arg := m.Meta["detail"] + cmd := exec.Command(arg[0], arg[1:]...) + v, e := cmd.CombinedOutput() + if e != nil { + m.Echo("%s\n", e) + } else { + m.Echo(string(v)) + } + }) + + if post != nil && !post(m) { + return false + } + + return true +} + +// }}} +func (c *Context) Exit(m *Message, arg ...string) { // {{{ + if m.Target == c { + for _, v := range c.Session { + if v.Name != "" { + c.Server.Exit(v, arg...) + + v.Name = "" + log.Println(c.Name, c.Help, "exit: session", v.Code, v.Target.Name, v.Target.Help) + v.Cmd("exit") + } + } + + for _, v := range c.Resource { + if v.Index != -1 { + c.Server.Exit(v, arg...) + + v.Index = -1 + log.Println(c.Name, c.Help, "exit: resource", v.Code, v.Context.Name, v.Context.Help) + v.Context.Cmd(v, "exit") + } + } + } else if m.Context == c { + if m.Name != "" { + c.Server.Exit(m, arg...) + + m.Name = "" + m.Cmd("exit") + } + } else { + c.Server.Exit(m, arg...) + } + +} + +// }}} + +func (c *Context) Find(name []string) (s *Context) { // {{{ cs := c.Contexts for _, v := range name { if x, ok := cs[v]; ok { @@ -478,79 +602,6 @@ func (c *Context) Del(arg ...string) { // {{{ // }}} -func (c *Context) Begin() *Context { // {{{ - c.Root.Capi("ncontext", 1) - for _, v := range c.Configs { - if v.Hand != nil { - v.Hand(c, v.Value) - } - } - - if c.Server != nil { - c.Server.Begin() - } - return c -} - -// }}} -func (c *Context) Start(m *Message) bool { // {{{ - defer func() { - if e := recover(); e != nil { - log.Println(e) - } - Pulse.Wait <- true - }() - - if c.Server != nil && c.Cap("status") != "start" { - - c.Cap("status", "status", "start", "服务状态") - defer c.Cap("status", "stop") - - c.Root.Capi("nserver", 1) - defer c.Root.Capi("nserver", -1) - - c.Resource = []*Message{m} - log.Println(m.Code, "start:", c.Name) - c.Server.Start(m) - log.Println(m.Code, "stop:", c.Name) - } - - return true -} - -// }}} -func (c *Context) Spawn(arg ...string) *Context { // {{{ - s := &Context{Name: arg[0], Help: c.Help} - c.Register(s, c.Server.Spawn(s, arg...)) - s.Begin() - return s -} - -// }}} - -func (c *Context) Post(m *Message) bool { // {{{ - if c.Messages == nil { - c.Messages = make(chan *Message, c.Confi("MessageQueueSize")) - } - - c.Messages <- m - if m.Wait != nil { - return <-m.Wait - } - return true -} - -// }}} -func (c *Context) Get() *Message { // {{{ - if c.Messages == nil { - c.Messages = make(chan *Message, c.Confi("MessageQueueSize")) - } - - return <-c.Messages -} - -// }}} - func (c *Context) Cmd(m *Message, arg ...string) string { // {{{ if x, ok := c.Commands[arg[0]]; ok { return x.Hand(c, m, arg...) @@ -663,7 +714,6 @@ func (c *Context) Capi(key string, value int) int { // {{{ return n + value } -// }}} // }}} var Index = &Context{Name: "ctx", Help: "根上下文", @@ -683,7 +733,7 @@ var Index = &Context{Name: "ctx", Help: "根上下文", "cert": &Config{Name: "cert", Value: "etc/cert.pem", Help: "证书文件"}, "key": &Config{Name: "key", Value: "etc/key.pem", Help: "私钥文件"}, - "debug": &Config{Name: "debug", Value: "on", Help: "调试模式"}, + "debug": &Config{Name: "debug", Value: "off", Help: "调试模式"}, "start": &Config{Name: "start", Value: "cli", Help: "默认启动模块"}, "init.sh": &Config{Name: "init.sh", Value: "etc/init.sh", Help: "默认启动脚本"}, "bench.log": &Config{Name: "bench.log", Value: "var/bench.log", Help: "默认日志文件", Hand: func(c *Context, arg string) string { @@ -715,8 +765,35 @@ func init() { if len(os.Args) > 3 { Index.Conf("start", os.Args[3]) } + log.Println("\n\n\n") } func Start() { - Index.Init() + Pulse.Root = Pulse + Pulse.Target = Index + Pulse.Context = Index + Pulse.Wait = make(chan bool, 10) + + cs := []*Context{Index} + for i := 0; i < len(cs); i++ { + cs[i].Root = Index + cs[i].Begin(nil) + + for _, v := range cs[i].Contexts { + cs = append(cs, v) + } + } + + for _, s := range Index.Contexts { + if ok, _ := regexp.MatchString(Index.Conf("start"), s.Name); ok { + go s.Start(Pulse.Spawn(s, s.Name).Put("detail", os.Stdout)) + } + } + + for { + <-Pulse.Wait + if Index.Capi("nserver", 0) == 0 { + return + } + } } diff --git a/src/context/tcp/tcp.go b/src/context/tcp/tcp.go index 51b106da..07e029dc 100644 --- a/src/context/tcp/tcp.go +++ b/src/context/tcp/tcp.go @@ -1,5 +1,5 @@ -package tcp // {{{ -// }}} +package tcp + import ( // {{{ "context" "log" @@ -13,7 +13,7 @@ type TCP struct { *ctx.Context } -func (tcp *TCP) Begin() ctx.Server { // {{{ +func (tcp *TCP) Begin(m *ctx.Message) ctx.Server { // {{{ return tcp } @@ -26,25 +26,29 @@ func (tcp *TCP) Start(m *ctx.Message) bool { // {{{ l, e := net.Listen("tcp", tcp.Conf("address")) tcp.Check(e) tcp.listener = l - tcp.Capi("nlisten", 1) - log.Println(tcp.Name, "listen:", l.Addr()) + + log.Printf("%s listen(%d): %v", tcp.Name, tcp.Capi("nlisten", 1), l.Addr()) + defer tcp.Capi("nlisten", -1) + defer log.Println("%s close(%d): %v", tcp.Name, tcp.Capi("nlisten", 0), l.Addr()) for { c, e := l.Accept() - log.Println(tcp.Name, "accept:", c.LocalAddr(), "<-", c.RemoteAddr()) tcp.Check(e) + log.Printf("%s accept(%d): %v<-%v", tcp.Name, tcp.Capi("nclient", 1), c.LocalAddr(), c.RemoteAddr()) + // defer log.Println(tcp.Name, "close:", tcp.Capi("nclient", -1), c.LocalAddr(), "<-", c.RemoteAddr()) - m := m.Spawn(m.Context, c.RemoteAddr().String(), 0) - m.Add("detail", "accept", c.RemoteAddr().String(), "tcp").Put("detail", c).Cmd() + msg := m.Spawn(m.Context, c.RemoteAddr().String()).Put("detail", c) + msg.Cmd("accept", c.RemoteAddr().String(), "tcp") } return true } // }}} -func (tcp *TCP) Spawn(c *ctx.Context, arg ...string) ctx.Server { // {{{ +func (tcp *TCP) Spawn(c *ctx.Context, m *ctx.Message, arg ...string) ctx.Server { // {{{ c.Caches = map[string]*ctx.Cache{ "nclient": &ctx.Cache{Name: "nclient", Value: "0", Help: "连接数量"}, + "status": &ctx.Cache{Name: "status", Value: "stop", Help: "服务状态"}, } c.Configs = map[string]*ctx.Config{ "address": &ctx.Config{Name: "address", Value: arg[0], Help: "监听地址"}, @@ -56,6 +60,23 @@ func (tcp *TCP) Spawn(c *ctx.Context, arg ...string) ctx.Server { // {{{ } +// }}} +func (tcp *TCP) Exit(m *ctx.Message, arg ...string) bool { // {{{ + + if c, ok := m.Data["result"].(net.Conn); ok && m.Target == tcp.Context { + c.Close() + delete(m.Data, "result") + return true + } + + if c, ok := m.Data["detail"].(net.Conn); ok && m.Context == tcp.Context { + c.Close() + delete(m.Data, "detail") + return true + } + return true +} + // }}} var Index = &ctx.Context{Name: "tcp", Help: "网络连接", @@ -95,6 +116,17 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络连接", return "" // }}} }}, + "exit": &ctx.Command{"exit", "退出", func(c *ctx.Context, m *ctx.Message, arg ...string) string { + tcp, ok := m.Target.Server.(*TCP) + if !ok { + tcp, ok = m.Context.Server.(*TCP) + } + if ok { + tcp.Context.Exit(m) + } + + return "" + }}, }, } diff --git a/src/example/bench.go b/src/example/bench.go index 97399170..ec3ee2d9 100644 --- a/src/example/bench.go +++ b/src/example/bench.go @@ -4,8 +4,6 @@ import ( "context" _ "context/cli" _ "context/tcp" - // _ "context/ssh" - // _ "context/web" ) func main() {