1
0
forked from x/icebergs

opt space

This commit is contained in:
shaoying 2020-06-18 19:23:39 +08:00
parent 87b0a27a88
commit 906a3cc6e2
8 changed files with 117 additions and 132 deletions

View File

@ -44,6 +44,10 @@ func (f *Frame) Start(m *Message, arg ...string) bool {
return true return true
} }
func (f *Frame) Close(m *Message, arg ...string) bool { func (f *Frame) Close(m *Message, arg ...string) bool {
m.TryCatch(m, true, func(m *Message) {
m.target.wg.Wait()
})
m.Log(LOG_CLOSE, "ice") m.Log(LOG_CLOSE, "ice")
defer m.Cost("close ice") defer m.Cost("close ice")
@ -54,10 +58,6 @@ func (f *Frame) Close(m *Message, arg ...string) bool {
s.Close(list[s], arg...) s.Close(list[s], arg...)
} }
}) })
m.TryCatch(m, true, func(m *Message) {
m.target.wg.Wait()
})
return true return true
} }

View File

@ -11,6 +11,8 @@ import (
"time" "time"
) )
const DREAM = "dream"
func init() { func init() {
Index.Merge(&ice.Context{ Index.Merge(&ice.Context{
Configs: map[string]*ice.Config{ Configs: map[string]*ice.Config{

View File

@ -191,6 +191,9 @@ func (web *Frame) ServeHTTP(w http.ResponseWriter, r *http.Request) {
web.ServeMux.ServeHTTP(w, r) web.ServeMux.ServeHTTP(w, r)
} }
} }
const SERVE = "serve"
func init() { func init() {
Index.Merge(&ice.Context{ Index.Merge(&ice.Context{
Configs: map[string]*ice.Config{ Configs: map[string]*ice.Config{

View File

@ -225,6 +225,8 @@ func ShareCreate(m *ice.Message, kind, name, text string, arg ...string) string
return _share_create(m, kind, name, text, arg...) return _share_create(m, kind, name, text, arg...)
} }
const SHARE = "share"
func init() { func init() {
Index.Merge(&ice.Context{ Index.Merge(&ice.Context{
Configs: map[string]*ice.Config{ Configs: map[string]*ice.Config{

View File

@ -3,7 +3,10 @@ package web
import ( import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
ice "github.com/shylinux/icebergs" ice "github.com/shylinux/icebergs"
aaa "github.com/shylinux/icebergs/base/aaa"
"github.com/shylinux/icebergs/base/cli"
kit "github.com/shylinux/toolkits" kit "github.com/shylinux/toolkits"
"github.com/shylinux/toolkits/task"
"fmt" "fmt"
"math/rand" "math/rand"
@ -13,70 +16,70 @@ import (
"time" "time"
) )
var SPACE = ice.Name("space", Index) func _link(m *ice.Message, pod interface{}) string {
return fmt.Sprintf(`<a target="_blank" href="%s?pod=%s">%s</a>`,
kit.Select(m.Conf(SHARE, "meta.domain"), m.Option(ice.MSG_USERWEB)), pod, pod)
}
func _space_list(m *ice.Message, space string) { func _space_list(m *ice.Message, space string) {
if space == "" { if space == "" {
m.Richs(SPACE, nil, "*", func(key string, value map[string]interface{}) { m.Richs(SPACE, nil, kit.MDB_FOREACH, func(key string, value map[string]interface{}) {
m.Push(key, value, []string{"time", "type", "name", "text"}) m.Push(key, value, []string{kit.MDB_TIME, kit.MDB_TYPE, kit.MDB_NAME, kit.MDB_TEXT})
if m.Option(ice.MSG_USERUA) != "" { m.Push(kit.MDB_LINK, _link(m, value[kit.MDB_NAME]))
m.Push("link", fmt.Sprintf(`<a target="_blank" href="%s?pod=%s">%s</a>`,
kit.Select(m.Conf(ice.WEB_SHARE, "meta.domain"), m.Option(ice.MSG_USERWEB)),
kit.Keys(m.Option(ice.MSG_USERPOD), value["name"]), value["name"]))
}
}) })
m.Sort("name") m.Sort(kit.MDB_NAME)
return return
} }
m.Richs(ice.WEB_SPACE, nil, space, func(key string, value map[string]interface{}) { m.Richs(SPACE, nil, space, func(key string, value map[string]interface{}) {
m.Push("detail", value) m.Push("detail", value)
m.Push("key", "link") m.Push(kit.MDB_KEY, kit.MDB_LINK)
m.Push("value", fmt.Sprintf(`<a target="_blank" href="%s?pod=%s">%s</a>`, m.Conf(ice.WEB_SHARE, "meta.domain"), value["name"], value["name"])) m.Push(kit.MDB_VALUE, _link(m, value[kit.MDB_NAME]))
}) })
} }
func _space_dial(m *ice.Message, dev, name string, arg ...string) { func _space_dial(m *ice.Message, dev, name string, arg ...string) {
// 基本信息 m.Richs(SPIDE, nil, dev, func(key string, value map[string]interface{}) {
node := m.Conf(ice.CLI_RUNTIME, "node.type") client := kit.Value(value, "client").(map[string]interface{})
user := m.Conf(ice.CLI_RUNTIME, "boot.username") redial := m.Confm(SPACE, "meta.redial")
web := m.Target().Server().(*Frame)
web := m.Target().Server().(*Frame) host := kit.Format(client["hostname"])
m.Hold(1).Gos(m, func(msg *ice.Message) { proto := kit.Select("ws", "wss", client["protocol"] == "https")
msg.Richs(ice.WEB_SPIDE, nil, dev, func(key string, value map[string]interface{}) { uri := kit.MergeURL(proto+"://"+host+"/space/", "name", name)
proto := kit.Select("ws", "wss", kit.Format(kit.Value(value, "client.protocol")) == "https") if u, e := url.Parse(uri); m.Assert(e) {
host := kit.Format(kit.Value(value, "client.hostname"))
for i := 0; i < kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.c")); i++ { task.Put(dev, func(task *task.Task) error {
if u, e := url.Parse(kit.MergeURL(proto+"://"+host+"/space/", "node", node, "name", name, "user", user, "share", value["share"])); msg.Assert(e) { for i := 0; i < kit.Int(redial["c"]); i++ {
if s, e := net.Dial("tcp", host); !msg.Warn(e != nil, "%s", e) { if s, e := net.Dial("tcp", host); !m.Warn(e != nil, "%s", e) {
if s, _, e := websocket.NewClient(s, u, nil, kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.r")), kit.Int(msg.Conf(ice.WEB_SPACE, "meta.buffer.w"))); !msg.Warn(e != nil, "%s", e) { if s, _, e := websocket.NewClient(s, u, nil, kit.Int(redial["r"]), kit.Int(redial["w"])); !m.Warn(e != nil, "%s", e) {
msg = m.Spawns()
// 连接成功 // 连接成功
msg.Rich(ice.WEB_SPACE, nil, kit.Dict( m.Rich(SPACE, nil, kit.Dict("socket", s,
kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev, kit.MDB_TEXT, kit.Value(value, "client.hostname"), kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev, kit.MDB_TEXT, host,
"socket", s,
)) ))
msg.Log(ice.LOG_CMDS, "%d conn %s success %s", i, dev, u) m.Log_CREATE("space", dev, "retry", i, "uri", uri)
if i = 0; HandleWSS(msg, true, web.send, s, dev) {
m = m.Spawns()
if i = 0; HandleWSS(m, true, web.send, s, dev) {
// 连接关闭
break break
} }
} }
} }
// 断线重连 // 断线重连
sleep := time.Duration(rand.Intn(kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.a"))*i+2)+kit.Int(msg.Conf(ice.WEB_SPACE, "meta.redial.b"))) * time.Millisecond sleep := time.Duration(rand.Intn(kit.Int(redial["a"])*i+2)+kit.Int(redial["b"])) * time.Millisecond
msg.Cost("order: %d sleep: %s reconnect: %s", i, sleep, u) m.Cost("order: %d sleep: %s reconnect: %s", i, sleep, u)
time.Sleep(sleep) time.Sleep(sleep)
} }
} return nil
}) })
m.Done() }
}) })
} }
func _space_send(m *ice.Message, space string, arg ...string) { func _space_send(m *ice.Message, space string, arg ...string) {
target := strings.Split(space, ".") target := strings.Split(space, ".")
m.Warn(m.Richs(ice.WEB_SPACE, nil, target[0], func(key string, value map[string]interface{}) { frame := m.Target().Server().(*Frame)
m.Warn(m.Richs(SPACE, nil, target[0], func(key string, value map[string]interface{}) {
if socket, ok := value["socket"].(*websocket.Conn); !m.Warn(!ok, "socket err") { if socket, ok := value["socket"].(*websocket.Conn); !m.Warn(!ok, "socket err") {
// 复制选项 // 复制选项
for _, k := range kit.Simple(m.Optionv("_option")) { for _, k := range kit.Simple(m.Optionv("_option")) {
@ -91,20 +94,17 @@ func _space_send(m *ice.Message, space string, arg ...string) {
// 构造路由 // 构造路由
id := kit.Format(m.Target().ID()) id := kit.Format(m.Target().ID())
m.Set(ice.MSG_DETAIL, arg...) frame.send[id] = m
m.Optionv(ice.MSG_TARGET, target[1:])
m.Optionv(ice.MSG_SOURCE, []string{id})
m.Info("send [%s]->%v %v %s", id, target, m.Detailv(), m.Format("meta"))
// 下发命令 // 下发命令
m.Target().Server().(*Frame).send[id] = m m.Set(ice.MSG_DETAIL, arg...)
socket.WriteMessage(1, []byte(m.Format("meta"))) _space_echo(m, []string{id}, target[1:], socket, target[0])
m.Option("timeout", m.Conf(SPACE, "meta.timeout.c")) m.Option("timeout", m.Conf(SPACE, "meta.timeout.c"))
m.Call(m.Option("_async") == "", func(res *ice.Message) *ice.Message { m.Call(m.Option("_async") == "", func(res *ice.Message) *ice.Message {
if res != nil && m != nil { if delete(frame.send, id); res != nil && m != nil {
// 返回结果 // 返回结果
return m.Copy(res) return m.Cost("[%v]->%v %v %v", id, target, arg, m.Copy(res).Format("append"))
} }
return nil return nil
}) })
@ -112,6 +112,19 @@ func _space_send(m *ice.Message, space string, arg ...string) {
}) == nil, "not found %s", space) }) == nil, "not found %s", space)
} }
func _space_echo(msg *ice.Message, source, target []string, c *websocket.Conn, name string) {
msg.Optionv(ice.MSG_SOURCE, source)
msg.Optionv(ice.MSG_TARGET, target)
c.WriteMessage(1, []byte(msg.Format("meta")))
target = append([]string{name}, target...)
msg.Info("send %v %v->%v %v %v", 1, source, target, msg.Detailv(), msg.Format("meta"))
}
func _space_exec(msg *ice.Message, source, target []string, c *websocket.Conn, name string) {
msg = msg.Cmd()
msg.Set("_option")
_space_echo(msg, []string{}, kit.Revert(source)[1:], c, name)
msg.Cost("%v->%v %v %v", source, target, msg.Detailv(), msg.Format("append"))
}
func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *websocket.Conn, name string) bool { func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *websocket.Conn, name string) bool {
for running := true; running; { for running := true; running; {
if t, b, e := c.ReadMessage(); m.Warn(e != nil, "space recv %d msg %v", t, e) { if t, b, e := c.ReadMessage(); m.Warn(e != nil, "space recv %d msg %v", t, e) {
@ -124,18 +137,16 @@ func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *webso
msg.Info("recv %v<-%v %s %v", target, source, msg.Detailv(), msg.Format("meta")) msg.Info("recv %v<-%v %s %v", target, source, msg.Detailv(), msg.Format("meta"))
if len(target) == 0 { if len(target) == 0 {
msg.Option(ice.MSG_USERROLE, msg.Cmdx(ice.AAA_ROLE, "check", msg.Option(ice.MSG_USERNAME))) msg.Option(ice.MSG_USERROLE, aaa.UserRole(msg, msg.Option(ice.MSG_USERNAME)))
msg.Logs(ice.LOG_AUTH, "role", msg.Option(ice.MSG_USERROLE), "user", msg.Option(ice.MSG_USERNAME)) msg.Log_AUTH(aaa.USERNAME, msg.Option(ice.MSG_USERNAME), aaa.USERROLE, msg.Option(ice.MSG_USERROLE))
if msg.Optionv(ice.MSG_HANDLE, "true"); !msg.Warn(!safe, "no right") { if msg.Optionv(ice.MSG_HANDLE, "true"); !msg.Warn(!safe, "no right") {
// 本地执行 // 本地执行
m.Option("_dev", name) msg.Option("_dev", name)
msg = msg.Cmd() task.Put(nil, func(task *task.Task) error {
msg.Set("_option") _space_exec(msg, source, target, c, name)
msg.Set("_option") return nil
} })
if source, target = []string{}, kit.Revert(source)[1:]; msg.Detail() == "exit" { continue
// 重启进程
return true
} }
} else if msg.Richs(ice.WEB_SPACE, nil, target[0], func(key string, value map[string]interface{}) { } else if msg.Richs(ice.WEB_SPACE, nil, target[0], func(key string, value map[string]interface{}) {
// 查询节点 // 查询节点
@ -149,8 +160,6 @@ func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *webso
} else if res, ok := send[msg.Option(ice.MSG_TARGET)]; len(target) == 1 && ok { } else if res, ok := send[msg.Option(ice.MSG_TARGET)]; len(target) == 1 && ok {
// 接收响应 // 接收响应
delete(send, msg.Option(ice.MSG_TARGET))
res.Cost("%v->%v %v %v", target, source, res.Detailv(), msg.Format("append"))
res.Back(msg) res.Back(msg)
continue continue
@ -164,97 +173,64 @@ func HandleWSS(m *ice.Message, safe bool, send map[string]*ice.Message, c *webso
source, target = []string{}, kit.Revert(source)[1:] source, target = []string{}, kit.Revert(source)[1:]
} }
// 发送报文 _space_echo(msg, source, target, socket, name)
msg.Optionv(ice.MSG_SOURCE, source)
msg.Optionv(ice.MSG_TARGET, target)
socket.WriteMessage(t, []byte(msg.Format("meta")))
target = append([]string{name}, target...)
msg.Info("send %v %v->%v %v %v", t, source, target, msg.Detailv(), msg.Format("meta"))
msg.Cost("%v->%v %v %v", source, target, msg.Detailv(), msg.Format("append"))
} }
} }
return false return false
} }
const SPACE = "space"
func init() { func init() {
Index.Merge(&ice.Context{ Index.Merge(&ice.Context{
Configs: map[string]*ice.Config{ Configs: map[string]*ice.Config{
ice.WEB_SPACE: {Name: "space", Help: "空间站", Value: kit.Data(kit.MDB_SHORT, kit.MDB_NAME, ice.WEB_SPACE: {Name: "space", Help: "空间站", Value: kit.Data(kit.MDB_SHORT, kit.MDB_NAME,
"redial.a", 3000, "redial.b", 1000, "redial.c", 1000, "redial", kit.Dict("a", 3000, "b", 1000, "c", 1000, "r", 4096, "w", 4096),
"buffer.r", 4096, "buffer.w", 4096, "timeout", kit.Dict("c", "180s"),
"timeout.c", "180s",
)}, )},
}, },
Commands: map[string]*ice.Command{ Commands: map[string]*ice.Command{
ice.WEB_SPACE: {Name: "space name auto", Help: "空间站", Meta: kit.Dict( ice.WEB_SPACE: {Name: "space [name [cmd...]] auto", Help: "空间站", Action: map[string]*ice.Action{
"exports", []string{"pod", "name"}, "connect": {Name: "connect [dev [name]]", Help: "连接", Hand: func(m *ice.Message, arg ...string) {
), Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { _space_dial(m, kit.Select("dev", arg, 0), kit.Select(cli.NodeName, arg, 2))
if len(arg) == 0 { }},
_space_list(m, "") }, Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) {
if len(arg) < 2 {
_space_list(m, kit.Select("", arg, 0))
return return
} }
switch arg[0] { if arg[0] == "" {
case "share": // 本地命令
m.Richs(ice.WEB_SPIDE, nil, m.Option("_dev"), func(key string, value map[string]interface{}) { m.Cmdy(arg[1:])
m.Log(ice.LOG_CREATE, "dev: %s share: %s", m.Option("_dev"), arg[1]) return
value["share"] = arg[1]
})
case "connect":
_space_dial(m, kit.Select("dev", arg, 1), kit.Select(m.Conf(ice.CLI_RUNTIME, "node.name"), arg, 2))
default:
if len(arg) == 1 {
// 空间详情
_space_list(m, arg[0])
break
}
if arg[0] == "" {
// 本地命令
m.Cmdy(arg[1:])
break
}
_space_send(m, arg[0], arg[1:]...)
} }
_space_send(m, arg[0], arg[1:]...)
}}, }},
"/space/": {Name: "/space/", Help: "空间站", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { "/space/": {Name: "/space/", Help: "空间站", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) {
if s, e := websocket.Upgrade(m.W, m.R, nil, m.Confi(ice.WEB_SPACE, "meta.buffer.r"), m.Confi(ice.WEB_SPACE, "meta.buffer.w")); m.Assert(e) { if s, e := websocket.Upgrade(m.W, m.R, nil, m.Confi(ice.WEB_SPACE, "meta.buffer.r"), m.Confi(ice.WEB_SPACE, "meta.buffer.w")); m.Assert(e) {
m.Option("name", strings.Replace(kit.Select(m.Option(ice.MSG_USERADDR), m.Option("name")), ".", "_", -1)) name := m.Option(kit.MDB_NAME, strings.Replace(kit.Select(m.Option(ice.MSG_USERADDR), m.Option(kit.MDB_NAME)), ".", "_", -1))
m.Option("node", kit.Select("worker", m.Option("node")))
// 共享空间
share := m.Option("share")
if m.Richs(ice.WEB_SHARE, nil, share, nil) == nil {
share = m.Cmdx(ice.WEB_SHARE, m.Option("node"), m.Option("name"), m.Option("user"))
}
// 添加节点 // 添加节点
h := m.Rich(ice.WEB_SPACE, nil, kit.Dict( h := m.Rich(ice.WEB_SPACE, nil, kit.Dict(
kit.MDB_TYPE, m.Option("node"), kit.MDB_TYPE, ice.WEB_WORKER,
kit.MDB_NAME, m.Option("name"), kit.MDB_NAME, m.Option(kit.MDB_NAME),
kit.MDB_TEXT, m.Option("user"), kit.MDB_TEXT, s.RemoteAddr().String(),
"share", share, "socket", s, "socket", s,
)) ))
m.Log(ice.LOG_CREATE, "space: %s share: %s", m.Option(kit.MDB_NAME), share) m.Log_CREATE(SPACE, name)
m.Gos(m, func(m *ice.Message) { task.Put(name, func(task *task.Task) error {
// 监听消息 // 监听消息
m.Event(ice.SPACE_START, m.Option("node"), m.Option("name")) m.Event(ice.SPACE_START, ice.WEB_WORKER, name)
HandleWSS(m, false, m.Target().Server().(*Frame).send, s, m.Option("name")) HandleWSS(m, false, m.Target().Server().(*Frame).send, s, name)
m.Log(ice.LOG_CLOSE, "%s: %s", m.Option(kit.MDB_NAME), kit.Format(m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h)))) m.Log(ice.LOG_CLOSE, "%s: %s", name, kit.Format(m.Confv(SPACE, kit.Keys(kit.MDB_HASH, h))))
m.Event(ice.SPACE_CLOSE, m.Option("node"), m.Option("name")) m.Event(ice.SPACE_CLOSE, ice.WEB_WORKER, name)
m.Confv(ice.WEB_SPACE, kit.Keys(kit.MDB_HASH, h), "") m.Confv(SPACE, kit.Keys(kit.MDB_HASH, h), "")
return nil
}) })
// 共享空间
if share != m.Option("share") {
m.Cmd(ice.WEB_SPACE, m.Option("name"), ice.WEB_SPACE, "share", share)
}
m.Echo(share)
} }
}}, }},
}}, nil) }}, nil)

View File

@ -84,6 +84,9 @@ func _spide_create(m *ice.Message, name, address string, arg ...string) {
func SpideCreate(m *ice.Message, name, address string, arg ...string) { func SpideCreate(m *ice.Message, name, address string, arg ...string) {
_spide_create(m, name, address, arg...) _spide_create(m, name, address, arg...)
} }
const SPIDE = "spide"
func init() { func init() {
Index.Merge(&ice.Context{ Index.Merge(&ice.Context{
Configs: map[string]*ice.Config{ Configs: map[string]*ice.Config{

View File

@ -13,12 +13,6 @@ import (
"time" "time"
) )
const (
SPIDE = "spide"
SERVE = "serve"
SHARE = "share"
)
type Frame struct { type Frame struct {
*http.Client *http.Client
*http.Server *http.Server
@ -264,4 +258,8 @@ var Index = &ice.Context{Name: "web", Help: "网络模块",
}, },
} }
func init() { ice.Index.Register(Index, &Frame{}, SERVE) } func init() {
ice.Index.Register(Index, &Frame{},
SPIDE, SERVE, SPACE, DREAM,
)
}

View File

@ -10,6 +10,7 @@ var ErrNameExists = errors.New("name already exists")
func Name(name string, value interface{}) string { func Name(name string, value interface{}) string {
if _, ok := names[name]; ok { if _, ok := names[name]; ok {
println(name)
panic(ErrNameExists) panic(ErrNameExists)
} }
names[name] = value names[name] = value