From 32a6ef00f8d8555dca42cbe03f9c6cb71da6b821 Mon Sep 17 00:00:00 2001 From: shaoying Date: Thu, 21 Nov 2019 00:48:37 +0800 Subject: [PATCH] add imq --- src/contexts/cli/cli.go | 105 ++++++++++++++++++++++++++++++---------- usr/local/wiki/miss.md | 2 + 2 files changed, 81 insertions(+), 26 deletions(-) diff --git a/src/contexts/cli/cli.go b/src/contexts/cli/cli.go index 991db9f4..e04b089b 100644 --- a/src/contexts/cli/cli.go +++ b/src/contexts/cli/cli.go @@ -147,7 +147,7 @@ func (cli *CLI) Close(m *ctx.Message, arg ...string) bool { var Index = &ctx.Context{Name: "cli", Help: "管理中心", Caches: map[string]*ctx.Cache{}, Configs: map[string]*ctx.Config{ - "runtime": &ctx.Config{Name: "runtime", Value: map[string]interface{}{ + "runtime": {Name: "runtime", Value: map[string]interface{}{ "init": []interface{}{"ctx_log", "ctx_cas", "ctx_ups", "ctx_box", "ctx_dev", "ctx_app", "ctx_bin", "ctx_root", "ctx_home", @@ -159,7 +159,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", "username": "shy", }, }, Help: "运行环境, host, init, boot, node, user, work"}, - "system": &ctx.Config{Name: "system", Value: map[string]interface{}{ + "system": {Name: "system", Value: map[string]interface{}{ "timeout": "180s", "env": map[string]interface{}{}, "shell": map[string]interface{}{ @@ -174,12 +174,12 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", "init": "etc/init.shy", "exit": "etc/exit.shy", }, }, Help: "系统环境, shell: path, cmd, arg, dir, env, active, daemon; "}, - "daemon": &ctx.Config{Name: "daemon", Value: map[string]interface{}{}, Help: "守护任务"}, - "timer": &ctx.Config{Name: "timer", Value: map[string]interface{}{ + "daemon": {Name: "daemon", Value: map[string]interface{}{}, Help: "守护任务"}, + "timer": {Name: "timer", Value: map[string]interface{}{ "list": map[string]interface{}{}, "next": "", }, Help: "定时器"}, - "project": &ctx.Config{Name: "project", Value: map[string]interface{}{ + "project": {Name: "project", Value: map[string]interface{}{ "github": "https://github.com/shylinux/context", "goproxy": "https://goproxy.cn", "template": map[string]interface{}{ @@ -194,7 +194,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", "path": "usr/trash", }, }, Help: "项目管理"}, - "compile": &ctx.Config{Name: "compile", Value: map[string]interface{}{ + "compile": {Name: "compile", Value: map[string]interface{}{ "bench": "src/extend/shy.go", "list": []interface{}{ map[string]interface{}{"os": "linux", "cpu": "arm"}, map[string]interface{}{"os": "linux", "cpu": "386"}, @@ -212,7 +212,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", "gopkg.in//gomail.v2", }, }, Help: "源码编译"}, - "publish": &ctx.Config{Name: "publish", Value: map[string]interface{}{ + "publish": {Name: "publish", Value: map[string]interface{}{ "path": "usr/publish", "list": map[string]interface{}{ "boot_sh": "bin/boot.sh", "zone_sh": "bin/zone.sh", @@ -231,7 +231,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", "server": "usr/publish/docker/local.shy", }, }, Help: "版本发布"}, - "upgrade": &ctx.Config{Name: "upgrade", Value: map[string]interface{}{ + "upgrade": {Name: "upgrade", Value: map[string]interface{}{ "install": []interface{}{"context", "love"}, "system": []interface{}{"boot.sh", "zone.sh", "user.sh", "node.sh", "init.shy", "common.shy", "exit.shy"}, "portal": []interface{}{"template.tar.gz", "librarys.tar.gz"}, @@ -252,13 +252,32 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", "librarys_tar_gz": "usr/librarys.tar.gz", }, }, Help: "服务升级"}, - "missyou": &ctx.Config{Name: "missyou", Value: map[string]interface{}{ + "missyou": {Name: "missyou", Value: map[string]interface{}{ "path": "usr/local/work", "local": "usr/local", }, Help: "任务管理"}, + "imq": {Name: "imq", Value: map[string]interface{}{ + "data": map[string]interface{}{ + "meta": map[string]interface{}{ + "least": 10, "limit": 100, + "store": "var/tmp/imq.csv", + }, + "list": map[string]interface{}{}, + }, + "topic": map[string]interface{}{ + "hello": []interface{}{ + map[string]interface{}{ + "cmd": "pwd", + }, + map[string]interface{}{ + "cmd": "help", + }, + }, + }, + }, Help: "消息队列"}, }, Commands: map[string]*ctx.Command{ - "_init": &ctx.Command{Name: "_init", Help: "环境初始化", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "_init": {Name: "_init", Help: "环境初始化", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { m.Conf("runtime", "host.GOARCH", runtime.GOARCH) m.Conf("runtime", "host.GOOS", runtime.GOOS) m.Conf("runtime", "host.pid", os.Getpid()) @@ -285,7 +304,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "runtime": &ctx.Command{Name: "runtime [host|boot|node|user|work [name [value]]]", Help: "运行环境", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "runtime": {Name: "runtime [host|boot|node|user|work [name [value]]]", Help: "运行环境", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if len(arg) == 0 { m.Cmdy("ctx.config", "runtime") return @@ -323,7 +342,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "system": &ctx.Command{Name: "system word...", Help: []string{"调用系统命令, word: 命令", + "system": {Name: "system word...", Help: []string{"调用系统命令, word: 命令", "cmd_timeout: 命令超时", "cmd_active(true/false): 是否交互", "cmd_daemon(true/false): 是否守护", @@ -495,7 +514,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "daemon": &ctx.Command{Name: "daemon", Help: "守护任务", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "daemon": {Name: "daemon", Help: "守护任务", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { pid := "" if len(arg) > 0 && m.Confs("daemon", arg[0]) { pid, arg = arg[0], arg[1:] @@ -540,7 +559,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "sleep": &ctx.Command{Name: "sleep time", Help: "睡眠, time(ns/us/ms/s/m/h): 时间值(纳秒/微秒/毫秒/秒/分钟/小时)", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "sleep": {Name: "sleep time", Help: "睡眠, time(ns/us/ms/s/m/h): 时间值(纳秒/微秒/毫秒/秒/分钟/小时)", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if d, e := time.ParseDuration(arg[0]); m.Assert(e) { m.Log("info", "sleep %v", d) time.Sleep(d) @@ -548,7 +567,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "timer": &ctx.Command{Name: "timer [begin time] [repeat] [order time] time cmd", Help: "定时任务", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "timer": {Name: "timer [begin time] [repeat] [order time] time cmd", Help: "定时任务", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if cli, ok := c.Server.(*CLI); m.Assert(ok) { // 定时列表 if len(arg) == 0 { @@ -652,7 +671,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "time": &ctx.Command{Name: "time when [begin|end|yestoday|tommorow|monday|sunday|first|last|new|eve] [offset]", + "time": {Name: "time when [begin|end|yestoday|tommorow|monday|sunday|first|last|new|eve] [offset]", Help: "查看时间, when: 输入的时间戳, 剩余参数是时间偏移", Form: map[string]int{"time_format": 1, "time_close": 1}, Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { @@ -742,7 +761,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "date": &ctx.Command{Name: "date", Help: "日历", Form: map[string]int{"space": 1, "format": 2, "count": 1, "nature": 1, "cmd": -1}, Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "date": {Name: "date", Help: "日历", Form: map[string]int{"space": 1, "format": 2, "count": 1, "nature": 1, "cmd": -1}, Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { show := map[int]string{0: "周日", 1: "周一", 2: "周二", 3: "周三", 4: "周四", 5: "周五", 6: "周六"} space := m.Options("space") @@ -805,14 +824,14 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", m.Table() return }}, - "proc": &ctx.Command{Name: "proc", Help: "进程管理", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "proc": {Name: "proc", Help: "进程管理", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { m.Cmdy("cli.system", "ps", kit.Select("ax", arg, 0), "cmd_parse", "cut") if len(arg) > 1 { m.Cmd("select", "reg", "COMMAND", arg[1]) } return }}, - "quit": &ctx.Command{Name: "quit code", Help: "停止服务", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "quit": {Name: "quit code", Help: "停止服务", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { m.Conf("runtime", "boot.count", m.Confi("runtime", "boot.count")+1) code := kit.Select("0", arg, 0) switch code { @@ -840,14 +859,14 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", }) return }}, - "_exit": &ctx.Command{Name: "_exit", Help: "退出命令", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "_exit": {Name: "_exit", Help: "退出命令", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { m.Confm("daemon", func(key string, info map[string]interface{}) { m.Cmd("cli.daemon", key, "stop") }) return }}, - "project": &ctx.Command{Name: "project init|stat|stats|trend|trends|submit|review|plugin [args...]", + "project": {Name: "project init|stat|stats|trend|trends|submit|review|plugin [args...]", Help: "项目管理", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { switch arg[0] { case "init": @@ -922,7 +941,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "compile": &ctx.Command{Name: "compile all|self|linux|windows|darwin|restart|plugin", Help: "项目编译", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "compile": {Name: "compile all|self|linux|windows|darwin|restart|plugin", Help: "项目编译", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { switch arg[0] { case "all": // 所有版本 @@ -996,7 +1015,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "publish": &ctx.Command{Name: "publish [args...]", Help: "项目发布", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "publish": {Name: "publish [args...]", Help: "项目发布", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if len(arg) == 0 { m.Confm("publish", "list", func(key string, value string) { arg = append(arg, strings.Replace(key, "_", ".", -1)) @@ -1038,7 +1057,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", m.Sort("file").Table() return }}, - "upgrade": &ctx.Command{Name: "upgrade install|bench|system|portal|script|plugin|restart|package|project", Help: "服务升级", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "upgrade": {Name: "upgrade install|bench|system|portal|script|plugin|restart|package|project", Help: "服务升级", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if len(arg) == 0 { m.Cmdy("ctx.config", "upgrade") return @@ -1188,7 +1207,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "missyou": &ctx.Command{Name: "missyou [topic] [name [action]]", Help: "任务管理", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "missyou": {Name: "missyou [topic] [name [action]]", Help: "任务管理", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { // 任务主题 topic := "hello" if len(arg) > 0 && (arg[0] == "" || m.Cmds("nfs.path", path.Join(m.Conf("cli.project", "plugin.path"), arg[0]))) { @@ -1248,7 +1267,7 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", } return }}, - "version": &ctx.Command{Name: "version", Help: "", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + "version": {Name: "version", Help: "", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if len(arg) == 0 { types := reflect.TypeOf(version) value := reflect.ValueOf(version) @@ -1270,9 +1289,43 @@ var Index = &ctx.Context{Name: "cli", Help: "管理中心", m.Cmdy("nfs.template", "force", path.Join(m.Conf("runtime", "boot.ctx_home"), "src/contexts/cli/version.go"), path.Join(m.Conf("project", "template.path"), "version/")) return }}, + + "imq": {Name: "imq cmd", Help: "消息队列", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + m.Grow("imq", "data", map[string]interface{}{ + "time": m.Time(), "cmd": arg, + }) + if imq, ok := m.Confv("imq", "work").(*IMQ); ok { + imq.q <- true + return + } + + imq := &IMQ{make(chan bool, 10)} + imq.q <- true + + m.Confv("imq", imq) + m.Gos(m.Spawn(), func(msg *ctx.Message) { + for <-imq.q { + m.Option("cache.offset", 0) + m.Option("cache.limit", m.Confi("imq", "meta.count")-m.Confi("imq", "meta.current")+1) + m.Grows("imq", "data", func(meta map[string]interface{}, index int, value map[string]interface{}) { + m.Log("info", "imq %d %v", index, value) + if cmd := kit.Trans(value["cmd"]); m.Confm("imq", []string{"topic", cmd[0]}, func(index int, value map[string]interface{}) { + m.Cmd(value["cmd"], cmd[1:]) + }) == nil { + m.Cmd(cmd) + } + }) + } + }) + return + }}, }, } +type IMQ struct { + q chan bool +} + func init() { ctx.Index.Register(Index, &CLI{Context: Index}) } diff --git a/usr/local/wiki/miss.md b/usr/local/wiki/miss.md index 0b0f4412..330b0a95 100644 --- a/usr/local/wiki/miss.md +++ b/usr/local/wiki/miss.md @@ -2,6 +2,8 @@ {{runs "help"}} +## 消息队列 + ## 应用开发 {{table "hello" `