diff --git a/base.go b/base.go index ff3ddefc..e4a5acdf 100644 --- a/base.go +++ b/base.go @@ -44,6 +44,10 @@ func (f *Frame) Start(m *Message, arg ...string) bool { return true } func (f *Frame) Close(m *Message, arg ...string) bool { + m.TryCatch(m, true, func(m *Message) { + m.target.wg.Wait() + }) + m.Log(LOG_CLOSE, "ice") defer m.Cost("close ice") @@ -54,10 +58,6 @@ func (f *Frame) Close(m *Message, arg ...string) bool { s.Close(list[s], arg...) } }) - - m.TryCatch(m, true, func(m *Message) { - m.target.wg.Wait() - }) return true } diff --git a/base/web/dream.go b/base/web/dream.go index 78ed2d24..1dc6de5f 100644 --- a/base/web/dream.go +++ b/base/web/dream.go @@ -11,6 +11,8 @@ import ( "time" ) +const DREAM = "dream" + func init() { Index.Merge(&ice.Context{ Configs: map[string]*ice.Config{ diff --git a/base/web/serve.go b/base/web/serve.go index ac09e320..337010d4 100644 --- a/base/web/serve.go +++ b/base/web/serve.go @@ -191,6 +191,9 @@ func (web *Frame) ServeHTTP(w http.ResponseWriter, r *http.Request) { web.ServeMux.ServeHTTP(w, r) } } + +const SERVE = "serve" + func init() { Index.Merge(&ice.Context{ Configs: map[string]*ice.Config{ diff --git a/base/web/share.go b/base/web/share.go index 3dceb53b..7b744b55 100644 --- a/base/web/share.go +++ b/base/web/share.go @@ -225,6 +225,8 @@ func ShareCreate(m *ice.Message, kind, name, text string, arg ...string) string return _share_create(m, kind, name, text, arg...) } +const SHARE = "share" + func init() { Index.Merge(&ice.Context{ Configs: map[string]*ice.Config{ diff --git a/base/web/space.go b/base/web/space.go index 07b93f5e..8b91037d 100644 --- a/base/web/space.go +++ b/base/web/space.go @@ -3,7 +3,10 @@ package web import ( "github.com/gorilla/websocket" ice "github.com/shylinux/icebergs" + aaa "github.com/shylinux/icebergs/base/aaa" + "github.com/shylinux/icebergs/base/cli" kit "github.com/shylinux/toolkits" + "github.com/shylinux/toolkits/task" "fmt" "math/rand" @@ -13,70 +16,70 @@ import ( "time" ) -var SPACE = ice.Name("space", Index) - +func _link(m *ice.Message, pod interface{}) string { + return fmt.Sprintf(`%s`, + kit.Select(m.Conf(SHARE, "meta.domain"), m.Option(ice.MSG_USERWEB)), pod, pod) +} func _space_list(m *ice.Message, space string) { if space == "" { - m.Richs(SPACE, nil, "*", func(key string, value map[string]interface{}) { - m.Push(key, value, []string{"time", "type", "name", "text"}) - if m.Option(ice.MSG_USERUA) != "" { - m.Push("link", fmt.Sprintf(`%s`, - kit.Select(m.Conf(ice.WEB_SHARE, "meta.domain"), m.Option(ice.MSG_USERWEB)), - kit.Keys(m.Option(ice.MSG_USERPOD), value["name"]), value["name"])) - } + m.Richs(SPACE, nil, kit.MDB_FOREACH, func(key string, value map[string]interface{}) { + m.Push(key, value, []string{kit.MDB_TIME, kit.MDB_TYPE, kit.MDB_NAME, kit.MDB_TEXT}) + m.Push(kit.MDB_LINK, _link(m, value[kit.MDB_NAME])) }) - m.Sort("name") + m.Sort(kit.MDB_NAME) return } - m.Richs(ice.WEB_SPACE, nil, space, func(key string, value map[string]interface{}) { + m.Richs(SPACE, nil, space, func(key string, value map[string]interface{}) { m.Push("detail", value) - m.Push("key", "link") - m.Push("value", fmt.Sprintf(`%s`, m.Conf(ice.WEB_SHARE, "meta.domain"), value["name"], value["name"])) + m.Push(kit.MDB_KEY, kit.MDB_LINK) + m.Push(kit.MDB_VALUE, _link(m, value[kit.MDB_NAME])) }) } func _space_dial(m *ice.Message, dev, name string, arg ...string) { - // 基本信息 - node := m.Conf(ice.CLI_RUNTIME, "node.type") - user := m.Conf(ice.CLI_RUNTIME, "boot.username") + m.Richs(SPIDE, nil, dev, func(key string, value map[string]interface{}) { + client := kit.Value(value, "client").(map[string]interface{}) + redial := m.Confm(SPACE, "meta.redial") + web := m.Target().Server().(*Frame) - web := m.Target().Server().(*Frame) - m.Hold(1).Gos(m, func(msg *ice.Message) { - msg.Richs(ice.WEB_SPIDE, nil, dev, func(key string, value map[string]interface{}) { - proto := kit.Select("ws", "wss", kit.Format(kit.Value(value, "client.protocol")) == "https") - host := kit.Format(kit.Value(value, "client.hostname")) + host := kit.Format(client["hostname"]) + proto := kit.Select("ws", "wss", client["protocol"] == "https") + uri := kit.MergeURL(proto+"://"+host+"/space/", "name", name) + if u, e := url.Parse(uri); m.Assert(e) { - for i := 0; i < kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.c")); i++ { - if u, e := url.Parse(kit.MergeURL(proto+"://"+host+"/space/", "node", node, "name", name, "user", user, "share", value["share"])); msg.Assert(e) { - if s, e := net.Dial("tcp", host); !msg.Warn(e != nil, "%s", e) { - if s, _, e := websocket.NewClient(s, u, nil, kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.r")), kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.w"))); !msg.Warn(e != nil, "%s", e) { - msg = m.Spawns() + task.Put(dev, func(task *task.Task) error { + for i := 0; i < kit.Int(redial["c"]); i++ { + if s, e := net.Dial("tcp", host); !m.Warn(e != nil, "%s", e) { + if s, _, e := websocket.NewClient(s, u, nil, kit.Int(redial["r"]), kit.Int(redial["w"])); !m.Warn(e != nil, "%s", e) { // 连接成功 - msg.Rich(ice.WEB_SPACE, nil, kit.Dict( - kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev, kit.MDB_TEXT, kit.Value(value, "client.hostname"), - "socket", s, + m.Rich(SPACE, nil, kit.Dict("socket", s, + kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev, kit.MDB_TEXT, host, )) - msg.Log(ice.LOG_CMDS, "%d conn %s success %s", i, dev, u) - if i = 0; HandleWSS(msg, true, web.send, s, dev) { + m.Log_CREATE("space", dev, "retry", i, "uri", uri) + + m = m.Spawns() + if i = 0; HandleWSS(m, true, web.send, s, dev) { + // 连接关闭 break } } } // 断线重连 - sleep := time.Duration(rand.Intn(kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.a"))*i+2)+kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.b"))) * time.Millisecond - msg.Cost("order: %d sleep: %s reconnect: %s", i, sleep, u) + sleep := time.Duration(rand.Intn(kit.Int(redial["a"])*i+2)+kit.Int(redial["b"])) * time.Millisecond + m.Cost("order: %d sleep: %s reconnect: %s", i, sleep, u) time.Sleep(sleep) } - } - }) - m.Done() + return nil + }) + } }) } func _space_send(m *ice.Message, space string, arg ...string) { target := strings.Split(space, ".") - m.Warn(m.Richs(ice.WEB_SPACE, nil, target[0], func(key string, value map[string]interface{}) { + frame := m.Target().Server().(*Frame) + m.Warn(m.Richs(SPACE, nil, target[0], func(key string, value map[string]interface{}) { if socket, ok := value["socket"].(*websocket.Conn); !m.Warn(!ok, "socket err") { // 复制选项 for _, k := range kit.Simple(m.Optionv("_option")) { @@ -91,20 +94,17 @@ func _space_send(m *ice.Message, space string, arg ...string) { // 构造路由 id := kit.Format(m.Target().ID()) - m.Set(ice.MSG_DETAIL, arg...) - m.Optionv(ice.MSG_TARGET, target[1:]) - m.Optionv(ice.MSG_SOURCE, []string{id}) - m.Info("send [%s]->%v %v %s", id, target, m.Detailv(), m.Format("meta")) + frame.send[id] = m // 下发命令 - m.Target().Server().(*Frame).send[id] = m - socket.WriteMessage(1, []byte(m.Format("meta"))) + m.Set(ice.MSG_DETAIL, arg...) + _space_echo(m, []string{id}, target[1:], socket, target[0]) m.Option("timeout", m.Conf(SPACE, "meta.timeout.c")) m.Call(m.Option("_async") == "", func(res *ice.Message) *ice.Message { - if res != nil && m != nil { + if delete(frame.send, id); res != nil && m != nil { // 返回结果 - return m.Copy(res) + return m.Cost("[%v]->%v %v %v", id, target, arg, m.Copy(res).Format("append")) } return nil }) @@ -112,6 +112,19 @@ func _space_send(m *ice.Message, space string, arg ...string) { }) == nil, "not found %s", space) } +func _space_echo(msg *ice.Message, source, target []string, c *websocket.Conn, name string) { + msg.Optionv(ice.MSG_SOURCE, source) + msg.Optionv(ice.MSG_TARGET, target) + c.WriteMessage(1, []byte(msg.Format("meta"))) + target = append([]string{name}, target...) + msg.Info("send %v %v->%v %v %v", 1, source, target, msg.Detailv(), msg.Format("meta")) +} +func _space_exec(msg *ice.Message, source, target []string, c *websocket.Conn, name string) { + msg = msg.Cmd() + msg.Set("_option") + _space_echo(msg, []string{}, kit.Revert(source)[1:], c, name) + msg.Cost("%v->%v %v %v", source, target, msg.Detailv(), msg.Format("append")) +} func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *websocket.Conn, name string) bool { for running := true; running; { if t, b, e := c.ReadMessage(); m.Warn(e != nil, "space recv %d msg %v", t, e) { @@ -124,18 +137,16 @@ func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *webso msg.Info("recv %v<-%v %s %v", target, source, msg.Detailv(), msg.Format("meta")) if len(target) == 0 { - msg.Option(ice.MSG_USERROLE, msg.Cmdx(ice.AAA_ROLE, "check", msg.Option(ice.MSG_USERNAME))) - msg.Logs(ice.LOG_AUTH, "role", msg.Option(ice.MSG_USERROLE), "user", msg.Option(ice.MSG_USERNAME)) + msg.Option(ice.MSG_USERROLE, aaa.UserRole(msg, msg.Option(ice.MSG_USERNAME))) + msg.Log_AUTH(aaa.USERNAME, msg.Option(ice.MSG_USERNAME), aaa.USERROLE, msg.Option(ice.MSG_USERROLE)) if msg.Optionv(ice.MSG_HANDLE, "true"); !msg.Warn(!safe, "no right") { // 本地执行 - m.Option("_dev", name) - msg = msg.Cmd() - msg.Set("_option") - msg.Set("_option") - } - if source, target = []string{}, kit.Revert(source)[1:]; msg.Detail() == "exit" { - // 重启进程 - return true + msg.Option("_dev", name) + task.Put(nil, func(task *task.Task) error { + _space_exec(msg, source, target, c, name) + return nil + }) + continue } } else if msg.Richs(ice.WEB_SPACE, nil, target[0], func(key string, value map[string]interface{}) { // 查询节点 @@ -149,8 +160,6 @@ func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *webso } else if res, ok := send[msg.Option(ice.MSG_TARGET)]; len(target) == 1 && ok { // 接收响应 - delete(send, msg.Option(ice.MSG_TARGET)) - res.Cost("%v->%v %v %v", target, source, res.Detailv(), msg.Format("append")) res.Back(msg) continue @@ -164,97 +173,64 @@ func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *webso source, target = []string{}, kit.Revert(source)[1:] } - // 发送报文 - msg.Optionv(ice.MSG_SOURCE, source) - msg.Optionv(ice.MSG_TARGET, target) - socket.WriteMessage(t, []byte(msg.Format("meta"))) - target = append([]string{name}, target...) - msg.Info("send %v %v->%v %v %v", t, source, target, msg.Detailv(), msg.Format("meta")) - msg.Cost("%v->%v %v %v", source, target, msg.Detailv(), msg.Format("append")) + _space_echo(msg, source, target, socket, name) } } return false } +const SPACE = "space" + func init() { Index.Merge(&ice.Context{ Configs: map[string]*ice.Config{ ice.WEB_SPACE: {Name: "space", Help: "空间站", Value: kit.Data(kit.MDB_SHORT, kit.MDB_NAME, - "redial.a", 3000, "redial.b", 1000, "redial.c", 1000, - "buffer.r", 4096, "buffer.w", 4096, - "timeout.c", "180s", + "redial", kit.Dict("a", 3000, "b", 1000, "c", 1000, "r", 4096, "w", 4096), + "timeout", kit.Dict("c", "180s"), )}, }, Commands: map[string]*ice.Command{ - ice.WEB_SPACE: {Name: "space name auto", Help: "空间站", Meta: kit.Dict( - "exports", []string{"pod", "name"}, - ), Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { - if len(arg) == 0 { - _space_list(m, "") + ice.WEB_SPACE: {Name: "space [name [cmd...]] auto", Help: "空间站", Action: map[string]*ice.Action{ + "connect": {Name: "connect [dev [name]]", Help: "连接", Hand: func(m *ice.Message, arg ...string) { + _space_dial(m, kit.Select("dev", arg, 0), kit.Select(cli.NodeName, arg, 2)) + }}, + }, Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { + if len(arg) < 2 { + _space_list(m, kit.Select("", arg, 0)) return } - switch arg[0] { - case "share": - m.Richs(ice.WEB_SPIDE, nil, m.Option("_dev"), func(key string, value map[string]interface{}) { - m.Log(ice.LOG_CREATE, "dev: %s share: %s", m.Option("_dev"), arg[1]) - value["share"] = arg[1] - }) - - case "connect": - _space_dial(m, kit.Select("dev", arg, 1), kit.Select(m.Conf(ice.CLI_RUNTIME, "node.name"), arg, 2)) - - default: - if len(arg) == 1 { - // 空间详情 - _space_list(m, arg[0]) - break - } - - if arg[0] == "" { - // 本地命令 - m.Cmdy(arg[1:]) - break - } - - _space_send(m, arg[0], arg[1:]...) + if arg[0] == "" { + // 本地命令 + m.Cmdy(arg[1:]) + return } + + _space_send(m, arg[0], arg[1:]...) }}, "/space/": {Name: "/space/", Help: "空间站", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { if s, e := websocket.Upgrade(m.W, m.R, nil, m.Confi(ice.WEB_SPACE, "meta.buffer.r"), m.Confi(ice.WEB_SPACE, "meta.buffer.w")); m.Assert(e) { - m.Option("name", strings.Replace(kit.Select(m.Option(ice.MSG_USERADDR), m.Option("name")), ".", "_", -1)) - m.Option("node", kit.Select("worker", m.Option("node"))) - - // 共享空间 - share := m.Option("share") - if m.Richs(ice.WEB_SHARE, nil, share, nil) == nil { - share = m.Cmdx(ice.WEB_SHARE, m.Option("node"), m.Option("name"), m.Option("user")) - } + name := m.Option(kit.MDB_NAME, strings.Replace(kit.Select(m.Option(ice.MSG_USERADDR), m.Option(kit.MDB_NAME)), ".", "_", -1)) // 添加节点 h := m.Rich(ice.WEB_SPACE, nil, kit.Dict( - kit.MDB_TYPE, m.Option("node"), - kit.MDB_NAME, m.Option("name"), - kit.MDB_TEXT, m.Option("user"), - "share", share, "socket", s, + kit.MDB_TYPE, ice.WEB_WORKER, + kit.MDB_NAME, m.Option(kit.MDB_NAME), + kit.MDB_TEXT, s.RemoteAddr().String(), + "socket", s, )) - m.Log(ice.LOG_CREATE, "space: %s share: %s", m.Option(kit.MDB_NAME), share) + m.Log_CREATE(SPACE, name) - m.Gos(m, func(m *ice.Message) { + task.Put(name, func(task *task.Task) error { // 监听消息 - m.Event(ice.SPACE_START, m.Option("node"), m.Option("name")) - HandleWSS(m, false, m.Target().Server().(*Frame).send, s, m.Option("name")) - m.Log(ice.LOG_CLOSE, "%s: %s", m.Option(kit.MDB_NAME), kit.Format(m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h)))) - m.Event(ice.SPACE_CLOSE, m.Option("node"), m.Option("name")) - m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h), "") + m.Event(ice.SPACE_START, ice.WEB_WORKER, name) + HandleWSS(m, false, m.Target().Server().(*Frame).send, s, name) + m.Log(ice.LOG_CLOSE, "%s: %s", name, kit.Format(m.Confv(SPACE, kit.Keys(kit.MDB_HASH, h)))) + m.Event(ice.SPACE_CLOSE, ice.WEB_WORKER, name) + m.Confv(SPACE, kit.Keys(kit.MDB_HASH, h), "") + return nil }) - - // 共享空间 - if share != m.Option("share") { - m.Cmd(ice.WEB_SPACE, m.Option("name"), ice.WEB_SPACE, "share", share) - } - m.Echo(share) } }}, }}, nil) diff --git a/base/web/spide.go b/base/web/spide.go index 3e1f1f73..26836040 100644 --- a/base/web/spide.go +++ b/base/web/spide.go @@ -84,6 +84,9 @@ func _spide_create(m *ice.Message, name, address string, arg ...string) { func SpideCreate(m *ice.Message, name, address string, arg ...string) { _spide_create(m, name, address, arg...) } + +const SPIDE = "spide" + func init() { Index.Merge(&ice.Context{ Configs: map[string]*ice.Config{ diff --git a/base/web/web.go b/base/web/web.go index 885d889a..caacbc23 100644 --- a/base/web/web.go +++ b/base/web/web.go @@ -13,12 +13,6 @@ import ( "time" ) -const ( - SPIDE = "spide" - SERVE = "serve" - SHARE = "share" -) - type Frame struct { *http.Client *http.Server @@ -264,4 +258,8 @@ var Index = &ice.Context{Name: "web", Help: "网络模块", }, } -func init() { ice.Index.Register(Index, &Frame{}, SERVE) } +func init() { + ice.Index.Register(Index, &Frame{}, + SPIDE, SERVE, SPACE, DREAM, + ) +} diff --git a/name.go b/name.go index c5c6636e..007e80ac 100644 --- a/name.go +++ b/name.go @@ -10,6 +10,7 @@ var ErrNameExists = errors.New("name already exists") func Name(name string, value interface{}) string { if _, ok := names[name]; ok { + println(name) panic(ErrNameExists) } names[name] = value