From 4971e6989f5a44e37ae22accd10ef8fed30e9c49 Mon Sep 17 00:00:00 2001 From: shaoying Date: Wed, 3 Jun 2020 18:39:17 +0800 Subject: [PATCH] add space --- base/web/space.go | 199 ++++++++++++++++++++++++++++++++++++++++++++++ base/web/web.go | 168 +------------------------------------- 2 files changed, 201 insertions(+), 166 deletions(-) create mode 100644 base/web/space.go diff --git a/base/web/space.go b/base/web/space.go new file mode 100644 index 00000000..3a0c2f9a --- /dev/null +++ b/base/web/space.go @@ -0,0 +1,199 @@ +package web + +import ( + "github.com/gorilla/websocket" + ice "github.com/shylinux/icebergs" + kit "github.com/shylinux/toolkits" + + "fmt" + "math/rand" + "net" + "net/url" + "strings" + "time" +) + +var SPACE = ice.Name("space", Index) + +func _space_list(m *ice.Message, space string) { + if space == "" { + m.Richs(SPACE, nil, "*", func(key string, value map[string]interface{}) { + m.Push(key, value, []string{"time", "type", "name", "text"}) + if m.Option(ice.MSG_USERUA) != "" { + m.Push("link", fmt.Sprintf(`%s`, + kit.Select(m.Conf(ice.WEB_SHARE, "meta.domain"), m.Option(ice.MSG_USERWEB)), + kit.Keys(m.Option(ice.MSG_USERPOD), value["name"]), value["name"])) + } + }) + m.Sort("name") + return + } + + m.Richs(ice.WEB_SPACE, nil, space, func(key string, value map[string]interface{}) { + m.Push("detail", value) + m.Push("key", "link") + m.Push("value", fmt.Sprintf(`%s`, m.Conf(ice.WEB_SHARE, "meta.domain"), value["name"], value["name"])) + }) +} +func _space_dial(m *ice.Message, dev, name string, arg ...string) { + // 基本信息 + node := m.Conf(ice.CLI_RUNTIME, "node.type") + user := m.Conf(ice.CLI_RUNTIME, "boot.username") + + web := m.Target().Server().(*Frame) + m.Hold(1).Gos(m, func(msg *ice.Message) { + msg.Richs(ice.WEB_SPIDE, nil, dev, func(key string, value map[string]interface{}) { + proto := kit.Select("ws", "wss", kit.Format(kit.Value(value, "client.protocol")) == "https") + host := kit.Format(kit.Value(value, "client.hostname")) + + for i := 0; i < kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.c")); i++ { + if u, e := url.Parse(kit.MergeURL(proto+"://"+host+"/space/", "node", node, "name", name, "user", user, "share", value["share"])); msg.Assert(e) { + if s, e := net.Dial("tcp", host); !msg.Warn(e != nil, "%s", e) { + if s, _, e := websocket.NewClient(s, u, nil, kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.r")), kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.w"))); !msg.Warn(e != nil, "%s", e) { + msg = m.Spawns() + + // 连接成功 + msg.Rich(ice.WEB_SPACE, nil, kit.Dict( + kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev, kit.MDB_TEXT, kit.Value(value, "client.hostname"), + "socket", s, + )) + msg.Log(ice.LOG_CMDS, "%d conn %s success %s", i, dev, u) + if i = 0; web.HandleWSS(msg, true, s, dev) { + break + } + } + } + + // 断线重连 + sleep := time.Duration(rand.Intn(kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.a"))*i+2)+kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.b"))) * time.Millisecond + msg.Cost("order: %d sleep: %s reconnect: %s", i, sleep, u) + time.Sleep(sleep) + } + } + }) + m.Done() + }) +} +func _space_send(m *ice.Message, space string, arg ...string) { + target := strings.Split(space, ".") + m.Warn(m.Richs(ice.WEB_SPACE, nil, target[0], func(key string, value map[string]interface{}) { + if socket, ok := value["socket"].(*websocket.Conn); !m.Warn(!ok, "socket err") { + // 复制选项 + for _, k := range kit.Simple(m.Optionv("_option")) { + switch k { + case "detail", "cmds", ice.MSG_SESSID: + default: + m.Optionv(k, m.Optionv(k)) + } + } + m.Optionv("_option", m.Optionv("_option")) + m.Optionv("option", nil) + + // 构造路由 + id := kit.Format(m.Target().ID()) + m.Set(ice.MSG_DETAIL, arg...) + m.Optionv(ice.MSG_TARGET, target[1:]) + m.Optionv(ice.MSG_SOURCE, []string{id}) + m.Info("send [%s]->%v %v %s", id, target, m.Detailv(), m.Format("meta")) + + // 下发命令 + m.Target().Server().(*Frame).send[id] = m + socket.WriteMessage(1, []byte(m.Format("meta"))) + + m.Call(m.Option("_async") == "", func(res *ice.Message) *ice.Message { + if res != nil && m != nil { + // 返回结果 + return m.Copy(res) + } + return nil + }) + } + }) == nil, "not found %s", space) +} + +func init() { + Index.Merge(&ice.Context{ + Configs: map[string]*ice.Config{ + ice.WEB_SPACE: {Name: "space", Help: "空间站", Value: kit.Data(kit.MDB_SHORT, kit.MDB_NAME, + "redial.a", 3000, "redial.b", 1000, "redial.c", 1000, + "buffer.r", 4096, "buffer.w", 4096, + "timeout.c", "30s", + )}, + }, + Commands: map[string]*ice.Command{ + ice.WEB_SPACE: {Name: "space name auto", Help: "空间站", Meta: kit.Dict( + "exports", []string{"pod", "name"}, + ), Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { + if len(arg) == 0 { + _space_list(m, "") + return + } + + switch arg[0] { + case "share": + m.Richs(ice.WEB_SPIDE, nil, m.Option("_dev"), func(key string, value map[string]interface{}) { + m.Log(ice.LOG_CREATE, "dev: %s share: %s", m.Option("_dev"), arg[1]) + value["share"] = arg[1] + }) + + case "connect": + dev := kit.Select("dev", arg, 1) + name := kit.Select(m.Conf(ice.CLI_RUNTIME, "node.name"), arg, 2) + _space_dial(m, dev, name) + + default: + if len(arg) == 1 { + // 空间详情 + _space_list(m, arg[0]) + break + } + + if arg[0] == "" { + // 本地命令 + m.Cmdy(arg[1:]) + break + } + + _space_send(m, arg[0], arg[1:]...) + } + }}, + + "/space/": {Name: "/space/", Help: "空间站", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { + if s, e := websocket.Upgrade(m.W, m.R, nil, m.Confi(ice.WEB_SPACE, "meta.buffer.r"), m.Confi(ice.WEB_SPACE, "meta.buffer.w")); m.Assert(e) { + m.Option("name", strings.Replace(kit.Select(m.Option(ice.MSG_USERADDR), m.Option("name")), ".", "_", -1)) + m.Option("node", kit.Select("worker", m.Option("node"))) + + // 共享空间 + share := m.Option("share") + if m.Richs(ice.WEB_SHARE, nil, share, nil) == nil { + share = m.Cmdx(ice.WEB_SHARE, "add", m.Option("node"), m.Option("name"), m.Option("user")) + } + // m.Cmd(ice.WEB_GROUP, m.Option("group"), "add", m.Option("name")) + + // 添加节点 + h := m.Rich(ice.WEB_SPACE, nil, kit.Dict( + kit.MDB_TYPE, m.Option("node"), + kit.MDB_NAME, m.Option("name"), + kit.MDB_TEXT, m.Option("user"), + "share", share, "socket", s, + )) + m.Log(ice.LOG_CREATE, "space: %s share: %s", m.Option(kit.MDB_NAME), share) + + m.Gos(m, func(m *ice.Message) { + // 监听消息 + m.Event(ice.SPACE_START, m.Option("node"), m.Option("name")) + m.Target().Server().(*Frame).HandleWSS(m, false, s, m.Option("name")) + m.Log(ice.LOG_CLOSE, "%s: %s", m.Option(kit.MDB_NAME), kit.Format(m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h)))) + m.Event(ice.SPACE_CLOSE, m.Option("node"), m.Option("name")) + m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h), "") + }) + + // 共享空间 + if share != m.Option("share") { + m.Cmd(ice.WEB_SPACE, m.Option("name"), ice.WEB_SPACE, "share", share) + } + m.Echo(share) + } + }}, + }}, nil) +} diff --git a/base/web/web.go b/base/web/web.go index 38eb1401..78d92892 100644 --- a/base/web/web.go +++ b/base/web/web.go @@ -11,9 +11,7 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "mime/multipart" - "net" "net/http" "net/url" "os" @@ -168,7 +166,7 @@ func (web *Frame) HandleWSS(m *ice.Message, safe bool, c *websocket.Conn, name s socket, msg := c, m.Spawns(b) target := kit.Simple(msg.Optionv(ice.MSG_TARGET)) source := kit.Simple(msg.Optionv(ice.MSG_SOURCE), name) - msg.Info("recv %v<-%v %s %v", target, source, msg.Detailv(), msg.Formats("meta")) + msg.Info("recv %v<-%v %s %v", target, source, msg.Detailv(), msg.Format("meta")) if len(target) == 0 { msg.Option(ice.MSG_USERROLE, msg.Cmdx(ice.AAA_ROLE, "check", msg.Option(ice.MSG_USERNAME))) @@ -214,7 +212,7 @@ func (web *Frame) HandleWSS(m *ice.Message, safe bool, c *websocket.Conn, name s msg.Optionv(ice.MSG_TARGET, target) socket.WriteMessage(t, []byte(msg.Format("meta"))) target = append([]string{name}, target...) - msg.Info("send %v %v->%v %v %v", t, source, target, msg.Detailv(), msg.Formats("meta")) + msg.Info("send %v %v->%v %v %v", t, source, target, msg.Detailv(), msg.Format("meta")) msg.Cost("%v->%v %v %v", source, target, msg.Detailv(), msg.Format("append")) } } @@ -481,11 +479,6 @@ var Index = &ice.Context{Name: "web", Help: "网络模块", "logheaders", "false", "init", "false", "black", kit.Dict(), )}, - ice.WEB_SPACE: {Name: "space", Help: "空间站", Value: kit.Data(kit.MDB_SHORT, kit.MDB_NAME, - "redial.a", 3000, "redial.b", 1000, "redial.c", 1000, - "buffer.r", 4096, "buffer.w", 4096, - "timeout.c", "30s", - )}, ice.WEB_DREAM: {Name: "dream", Help: "梦想家", Value: kit.Data("path", "usr/local/work", // "cmd", []interface{}{ice.CLI_SYSTEM, "ice.sh", "start", ice.WEB_SPACE, "connect"}, "cmd", []interface{}{ice.CLI_SYSTEM, "ice.bin", ice.WEB_SPACE, "connect"}, @@ -856,125 +849,6 @@ var Index = &ice.Context{Name: "web", Help: "网络模块", m.Conf(ice.WEB_FAVOR, "meta.template", favor_template) m.Conf(ice.WEB_SHARE, "meta.template", share_template) }}, - ice.WEB_SPACE: {Name: "space name auto", Help: "空间站", Meta: kit.Dict( - "exports", []string{"pod", "name"}, - ), Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { - if len(arg) == 0 { - // 空间列表 - m.Richs(ice.WEB_SPACE, nil, "*", func(key string, value map[string]interface{}) { - m.Push(key, value, []string{"time", "type", "name", "text"}) - if m.Option(ice.MSG_USERUA) != "" { - m.Push("link", fmt.Sprintf(`%s`, - kit.Select(m.Conf(ice.WEB_SHARE, "meta.domain"), m.Option(ice.MSG_USERWEB)), - kit.Keys(m.Option(ice.MSG_USERPOD), value["name"]), value["name"])) - } - }) - m.Sort("name") - return - } - - web := m.Target().Server().(*Frame) - switch arg[0] { - case "share": - m.Richs(ice.WEB_SPIDE, nil, m.Option("_dev"), func(key string, value map[string]interface{}) { - m.Log(ice.LOG_CREATE, "dev: %s share: %s", m.Option("_dev"), arg[1]) - value["share"] = arg[1] - }) - - case "connect": - // 基本信息 - dev := kit.Select("dev", arg, 1) - node := m.Conf(ice.CLI_RUNTIME, "node.type") - name := kit.Select(m.Conf(ice.CLI_RUNTIME, "node.name"), arg, 2) - user := m.Conf(ice.CLI_RUNTIME, "boot.username") - - m.Hold(1).Gos(m, func(msg *ice.Message) { - msg.Richs(ice.WEB_SPIDE, nil, dev, func(key string, value map[string]interface{}) { - proto := kit.Select("ws", "wss", kit.Format(kit.Value(value, "client.protocol")) == "https") - host := kit.Format(kit.Value(value, "client.hostname")) - - for i := 0; i < kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.c")); i++ { - if u, e := url.Parse(kit.MergeURL(proto+"://"+host+"/space/", "node", node, "name", name, "user", user, - "proxy", kit.Select("master", arg, 3), "group", kit.Select("worker", arg, 4), "share", value["share"])); msg.Assert(e) { - if s, e := net.Dial("tcp", host); !msg.Warn(e != nil, "%s", e) { - if s, _, e := websocket.NewClient(s, u, nil, kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.r")), kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.w"))); !msg.Warn(e != nil, "%s", e) { - msg = m.Spawns() - - // 连接成功 - msg.Rich(ice.WEB_SPACE, nil, kit.Dict( - kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev, kit.MDB_TEXT, kit.Value(value, "client.hostname"), - "socket", s, - )) - msg.Log(ice.LOG_CMDS, "%d conn %s success %s", i, dev, u) - if i = 0; web.HandleWSS(msg, true, s, dev) { - break - } - } - } - - // 断线重连 - sleep := time.Duration(rand.Intn(kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.a"))*i+2)+kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.b"))) * time.Millisecond - msg.Cost("order: %d sleep: %s reconnect: %s", i, sleep, u) - time.Sleep(sleep) - } - } - }) - m.Done() - }) - - default: - if len(arg) == 1 { - // 空间详情 - m.Richs(ice.WEB_SPACE, nil, arg[0], func(key string, value map[string]interface{}) { - m.Push("detail", value) - m.Push("key", "link") - m.Push("value", fmt.Sprintf(`%s`, m.Conf(ice.WEB_SHARE, "meta.domain"), value["name"], value["name"])) - }) - break - } - - if arg[0] == "" { - // 本地命令 - m.Cmdy(arg[1:]) - break - } - - target := strings.Split(arg[0], ".") - m.Warn(m.Richs(ice.WEB_SPACE, nil, target[0], func(key string, value map[string]interface{}) { - if socket, ok := value["socket"].(*websocket.Conn); !m.Warn(!ok, "socket err") { - // 复制选项 - for _, k := range kit.Simple(m.Optionv("_option")) { - switch k { - case "detail", "cmds", ice.MSG_SESSID: - default: - m.Optionv(k, m.Optionv(k)) - } - } - m.Optionv("_option", m.Optionv("_option")) - m.Optionv("option", nil) - - // 构造路由 - id := kit.Format(c.ID()) - m.Set(ice.MSG_DETAIL, arg[1:]...) - m.Optionv(ice.MSG_TARGET, target[1:]) - m.Optionv(ice.MSG_SOURCE, []string{id}) - m.Info("send [%s]->%v %v %s", id, target, m.Detailv(), m.Formats("meta")) - - // 下发命令 - m.Target().Server().(*Frame).send[id] = m - socket.WriteMessage(1, []byte(m.Format("meta"))) - - m.Call(m.Option("_async") == "", func(res *ice.Message) *ice.Message { - if res != nil && m != nil { - // 返回结果 - return m.Copy(res) - } - return nil - }) - } - }) == nil, "not found %s", arg[0]) - } - }}, ice.WEB_DREAM: {Name: "dream name auto", Help: "梦想家", Meta: kit.Dict( "exports", []string{"you", "name"}, "detail", []interface{}{"启动", "停止"}, ), Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { @@ -1037,44 +911,6 @@ var Index = &ice.Context{Name: "web", Help: "网络模块", } m.Cmdy("nfs.dir", p) }}, - - "/space/": {Name: "/space/", Help: "空间站", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { - if s, e := websocket.Upgrade(m.W, m.R, nil, m.Confi(ice.WEB_SPACE, "meta.buffer.r"), m.Confi(ice.WEB_SPACE, "meta.buffer.w")); m.Assert(e) { - m.Option("name", strings.Replace(kit.Select(m.Option(ice.MSG_USERADDR), m.Option("name")), ".", "_", -1)) - m.Option("node", kit.Select("worker", m.Option("node"))) - - // 共享空间 - share := m.Option("share") - if m.Richs(ice.WEB_SHARE, nil, share, nil) == nil { - share = m.Cmdx(ice.WEB_SHARE, "add", m.Option("node"), m.Option("name"), m.Option("user")) - } - // m.Cmd(ice.WEB_GROUP, m.Option("group"), "add", m.Option("name")) - - // 添加节点 - h := m.Rich(ice.WEB_SPACE, nil, kit.Dict( - kit.MDB_TYPE, m.Option("node"), - kit.MDB_NAME, m.Option("name"), - kit.MDB_TEXT, m.Option("user"), - "share", share, "socket", s, - )) - m.Log(ice.LOG_CREATE, "space: %s share: %s", m.Option(kit.MDB_NAME), share) - - m.Gos(m, func(m *ice.Message) { - // 监听消息 - m.Event(ice.SPACE_START, m.Option("node"), m.Option("name")) - m.Target().Server().(*Frame).HandleWSS(m, false, s, m.Option("name")) - m.Log(ice.LOG_CLOSE, "%s: %s", m.Option(kit.MDB_NAME), kit.Format(m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h)))) - m.Event(ice.SPACE_CLOSE, m.Option("node"), m.Option("name")) - m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h), "") - }) - - // 共享空间 - if share != m.Option("share") { - m.Cmd(ice.WEB_SPACE, m.Option("name"), ice.WEB_SPACE, "share", share) - } - m.Echo(share) - } - }}, }, }