1
0
forked from x/icebergs
icebergs/base/web/space.go
shylinux@163.com a5a6aa53a8 opt some
2022-09-15 21:04:41 +08:00

410 lines
13 KiB
Go

package web
import (
"math/rand"
"net"
"strconv"
"strings"
"time"
ice "shylinux.com/x/icebergs"
"shylinux.com/x/icebergs/base/aaa"
"shylinux.com/x/icebergs/base/cli"
"shylinux.com/x/icebergs/base/gdb"
"shylinux.com/x/icebergs/base/mdb"
"shylinux.com/x/icebergs/base/tcp"
kit "shylinux.com/x/toolkits"
"shylinux.com/x/websocket"
)
func _space_domain(m *ice.Message) (link string) {
if link = ice.Info.Domain; link == "" {
link = m.Cmd(SPACE, ice.OPS, cli.PWD).Append(mdb.LINK)
}
if link == "" {
link = m.Cmd(SPACE, ice.DEV, cli.PWD).Append(mdb.LINK)
}
if link == "" {
link = m.Cmd(SPACE, ice.SHY, cli.PWD).Append(mdb.LINK)
}
if link == "" {
link = m.Option(ice.MSG_USERWEB)
}
if link == "" {
link = kit.Format("http://localhost:%s", kit.Select(m.Option(tcp.PORT), m.Cmd(SERVE).Append(tcp.PORT)))
}
return tcp.ReplaceLocalhost(m, link)
}
func _space_dial(m *ice.Message, dev, name string, arg ...string) {
if strings.HasPrefix(dev, ice.HTTP) {
m.Cmd(SPIDE, mdb.CREATE, ice.DEV, dev)
dev = ice.DEV
}
msg := m.Cmd(SPIDE, dev)
host := msg.Append(kit.Keys(tcp.CLIENT, tcp.HOSTNAME))
proto := strings.Replace(msg.Append(kit.Keys(tcp.CLIENT, tcp.PROTOCOL)), ice.HTTP, "ws", 1)
uri := kit.MergeURL(proto+"://"+host+PP(SPACE), mdb.TYPE, ice.Info.NodeType, mdb.NAME, name, SHARE, ice.Info.CtxShare, RIVER, ice.Info.CtxRiver, arg)
u := kit.ParseURL(uri)
m.Go(func() {
frame := m.Target().Server().(*Frame)
ls := strings.Split(host, ice.DF)
args := kit.SimpleKV("type,name,host,port", proto, dev, ls[0], kit.Select("443", ls, 1))
redial, _ := m.Configv(REDIAL).(ice.Map)
a, b, c := kit.Int(redial["a"]), kit.Int(redial["b"]), kit.Int(redial["c"])
for i := 0; i >= 0 && i < c; i++ {
msg := m.Spawn()
msg.Cmd(tcp.CLIENT, tcp.DIAL, args, func(s net.Conn) {
if s, _, e := websocket.NewClient(s, u, nil, kit.Int(redial["r"]), kit.Int(redial["w"])); !msg.Warn(e) {
msg.Logs(mdb.CREATE, SPACE, dev, "retry", i, "uri", uri)
mdb.HashCreate(msg, kit.SimpleKV("", MASTER, dev, host), kit.Dict(mdb.TARGET, s))
defer mdb.HashRemove(msg, mdb.NAME, name)
if i = 0; _space_handle(msg, true, frame, s, dev) {
i = -2 // 关闭连接
}
}
})
// 断线重连
sleep := time.Duration(rand.Intn(a*(i+1))+b) * time.Millisecond
msg.Cost("order", i, "sleep", sleep, "reconnect", dev)
if time.Sleep(sleep); mdb.HashSelect(msg).Length() == 0 {
break
}
}
})
}
func _space_handle(m *ice.Message, safe bool, frame *Frame, c *websocket.Conn, name string) bool {
for {
_, b, e := c.ReadMessage()
if m.Warn(e, SPACE, name) {
break
}
socket, msg := c, m.Spawn(b)
target := kit.Simple(msg.Optionv(ice.MSG_TARGET))
source := kit.Simple(msg.Optionv(ice.MSG_SOURCE), name)
msg.Log("recv", "%v<-%v %s %v", target, source, msg.Detailv(), msg.FormatMeta())
if len(target) == 0 { // 执行命令
if msg.Optionv(ice.MSG_HANDLE, ice.TRUE); safe { // 下行命令
msg.Option(ice.MSG_USERROLE, kit.Select(msg.Option(ice.MSG_USERROLE), msg.Cmd(aaa.USER, msg.Option(ice.MSG_USERNAME)).Append(aaa.USERROLE)))
if msg.Option(ice.MSG_USERROLE) == aaa.VOID && ice.Info.UserName == aaa.TECH {
msg.Option(ice.MSG_USERROLE, aaa.TECH) // 演示空间
}
msg.Auth(aaa.USERROLE, msg.Option(ice.MSG_USERROLE), aaa.USERNAME, msg.Option(ice.MSG_USERNAME))
msg.Go(func() { _space_exec(msg, source, target, c, name) })
continue
}
// 上行请求
msg.Push(mdb.LINK, kit.MergePOD(_space_domain(msg), name))
_space_echo(msg, []string{}, kit.Revert(source)[1:], c, name)
continue
}
if mdb.HashSelectDetail(msg, target[0], func(value ice.Map) { // 转发命令
if s, ok := value[mdb.TARGET].(*websocket.Conn); ok {
socket, source, target = s, source, target[1:]
_space_echo(msg, source, target, socket, kit.Select("", target))
return // 转发报文
}
if msg.Warn(msg.Option(ice.MSG_HANDLE) == ice.TRUE, ice.ErrNotValid, "already handled") {
// 回复失败
} else { // 下发失败
msg.Warn(true, ice.ErrNotFound, target)
source, target = []string{}, kit.Revert(source)[1:]
}
}) {
continue
}
if res, ok := frame.getSend(msg.Option(ice.MSG_TARGET)); len(target) != 1 || !ok {
if msg.Warn(msg.Option(ice.MSG_HANDLE) == ice.TRUE, ice.ErrNotValid, target) {
// 回复失败
} else { // 下发失败
msg.Warn(true, ice.ErrNotFound, target)
source, target = []string{}, kit.Revert(source)[1:]
}
continue
} else { // 接收响应
m.Sleep30ms()
back(res, msg)
}
}
return false
}
func _space_exec(msg *ice.Message, source, target []string, c *websocket.Conn, name string) {
if aaa.Right(msg, msg.Detailv()) { // 执行命令
msg = msg.Cmd()
}
msg.Set(ice.MSG_OPTS)
_space_echo(msg, []string{}, kit.Revert(source)[1:], c, name)
msg.Cost(kit.Format("%v->%v %v %v", source, target, msg.Detailv(), msg.FormatSize()))
}
func _space_echo(msg *ice.Message, source, target []string, c *websocket.Conn, name string) {
msg.Optionv(ice.MSG_SOURCE, source)
msg.Optionv(ice.MSG_TARGET, target)
if e := c.WriteMessage(1, []byte(msg.FormatMeta())); msg.Warn(e) { // 回复失败
mdb.HashRemove(msg, mdb.NAME, name)
c.Close()
return
}
target = append([]string{name}, target...)
msg.Log("send", "%v->%v %v %v", source, target, msg.Detailv(), msg.FormatMeta())
}
func _space_send(m *ice.Message, space string, arg ...string) {
if space == "" || space == MYSELF || space == ice.Info.NodeName {
m.Cmdy(arg) // 本地命令
return
}
// 生成参数
for _, k := range kit.Simple(m.Optionv(ice.MSG_OPTS)) {
switch k {
case ice.MSG_DETAIL, ice.MSG_CMDS, ice.MSG_SESSID:
default:
m.Optionv(k, m.Optionv(k))
}
}
m.Optionv(ice.MSG_OPTS, m.Optionv(ice.MSG_OPTS))
m.Optionv(ice.MSG_OPTION, m.Optionv(ice.MSG_OPTS))
m.Set(ice.MSG_DETAIL, arg...)
// 发送命令
frame := m.Target().Server().(*Frame)
target, id := kit.Split(space, ice.PT, ice.PT), ""
if m.Warn(!mdb.HashSelectDetail(m, target[0], func(value ice.Map) {
if socket, ok := value[mdb.TARGET].(*websocket.Conn); !m.Warn(!ok, ice.ErrNotFound, mdb.TARGET) {
id = frame.addSend(kit.Format(m.Target().ID()), m)
_space_echo(m, []string{id}, target[1:], socket, target[0])
}
}), ice.ErrNotFound, space) {
return
}
// 返回结果
m.Option(TIMEOUT, m.Config(kit.Keys(TIMEOUT, "c")))
call(m, m.Option("_async") == "", func(res *ice.Message) {
m.Cost(kit.Format("[%v]->%v %v %v", id, target, arg, m.Copy(res).FormatSize()))
frame.delSend(id)
})
}
func _space_fork(m *ice.Message) {
buffer, _ := m.Configv(BUFFER).(ice.Map)
if s, e := websocket.Upgrade(m.W, m.R, nil, kit.Int(buffer["r"]), kit.Int(buffer["w"])); m.Assert(e) {
text := kit.Select(s.RemoteAddr().String(), m.Option(ice.MSG_USERADDR))
name := strings.ToLower(m.Option(mdb.NAME, kit.ReplaceAll(kit.Select(text, m.Option(mdb.NAME)), ".", "_", ":", "_")))
kind := kit.Select(WORKER, m.Option(mdb.TYPE))
args := append([]string{mdb.TYPE, kind, mdb.NAME, name}, m.OptionSimple(SHARE, RIVER)...)
m.Go(func() {
mdb.HashCreate(m, mdb.TEXT, kit.Select(text, m.Option(mdb.TEXT)), args, kit.Dict(mdb.TARGET, s))
switch kind {
case CHROME: // 交互节点
gdb.Event(m, SPACE_OPEN, args)
defer gdb.Event(m, SPACE_CLOSE, args)
defer mdb.HashRemove(m, mdb.NAME, name)
m.Go(func(msg *ice.Message) {
switch m.Option(ice.CMD) {
case cli.PWD:
link := kit.MergeURL(_space_domain(m), aaa.GRANT, name)
msg.Sleep300ms(SPACE, name, cli.PWD, name, link, msg.Cmdx(cli.QRCODE, link))
case SSO:
link := _space_domain(m)
ls := strings.Split(kit.ParseURL(link).Path, ice.PS)
link = kit.MergeURL2(_space_domain(m), "/chat/sso", SPACE, kit.Select("", ls, 3), "back", m.Option("back"))
msg.Sleep300ms(SPACE, name, cli.PWD, name, link, msg.Cmdx(cli.QRCODE, link))
default:
msg.Sleep300ms(SPACE, name, cli.PWD, name)
}
})
case WORKER: // 工作节点
gdb.Event(m, DREAM_START, args)
defer gdb.Event(m, DREAM_STOP, args)
if m.Option("daemon") == "ops" {
defer m.Cmd(DREAM, DREAM_STOP, args)
}
default: // 服务节点
gdb.Event(m, SPACE_START, args)
defer gdb.Event(m, SPACE_STOP, args)
defer mdb.HashRemove(m, mdb.NAME, name)
}
_space_handle(m, false, m.Target().Server().(*Frame), s, name)
})
}
}
func _space_search(m *ice.Message, kind, name, text string, arg ...string) {
m.Cmd(SPACE, ice.OptionFields(""), func(value ice.Maps) {
if !strings.Contains(value[mdb.NAME], name) {
return
}
switch value[mdb.TYPE] {
case CHROME:
case MASTER:
m.PushSearch(mdb.TEXT, m.CmdAppend(SPIDE, value[mdb.NAME], CLIENT_URL), value)
default:
m.PushSearch(mdb.TEXT, MergePod(m, value[mdb.NAME]), value)
}
})
if name != "" {
return
}
m.Cmd(SERVE, ice.OptionFields(""), func(val ice.Maps) {
m.Cmd(tcp.HOST, ice.OptionFields(""), func(value ice.Maps) {
m.PushSearch(kit.SimpleKV("", MYSELF, value[mdb.NAME], kit.Format("http://%s:%s", value[aaa.IP], val[tcp.PORT])))
})
})
}
const (
CHROME = "chrome"
FRIEND = "friend"
MASTER = "master"
MYSELF = "myself"
SERVER = "server"
WORKER = "worker"
)
const (
BUFFER = "buffer"
REDIAL = "redial"
TIMEOUT = "timeout"
SPACE_OPEN = "space.open"
SPACE_CLOSE = "space.close"
SPACE_START = "space.start"
SPACE_STOP = "space.stop"
)
const SPACE = "space"
func init() {
Index.Merge(&ice.Context{Configs: ice.Configs{
SPACE: {Name: SPACE, Help: "空间站", Value: kit.Data(
mdb.SHORT, mdb.NAME, mdb.FIELD, "time,type,name,text",
BUFFER, kit.Dict("r", ice.MOD_BUFS, "w", ice.MOD_BUFS),
REDIAL, kit.Dict("a", 3000, "b", 1000, "c", 1000), TIMEOUT, kit.Dict("c", "180s"),
)},
}, Commands: ice.Commands{
SPACE: {Name: "space name cmd auto invite", Help: "空间站", Actions: ice.MergeActions(ice.Actions{
ice.CTX_INIT: {Hand: func(m *ice.Message, arg ...string) { m.Conf("", mdb.HASH, "") }},
ice.CTX_EXIT: {Hand: func(m *ice.Message, arg ...string) {
mdb.HashSelectClose(m)
m.Conf("", mdb.HASH, "")
}},
mdb.REMOVE: {Name: "remove", Help: "删除", Hand: func(m *ice.Message, arg ...string) {
mdb.HashModify(m, m.OptionSimple(mdb.NAME), mdb.STATUS, cli.STOP)
defer mdb.HashRemove(m, m.OptionSimple(mdb.NAME))
m.Cmd(SPACE, m.Option(mdb.NAME), ice.EXIT)
}},
mdb.SEARCH: {Name: "search type name text", Help: "搜索", Hand: func(m *ice.Message, arg ...string) {
_space_search(m, arg[0], arg[1], kit.Select("", arg, 2))
}},
aaa.INVITE: {Name: "invite", Help: "添加", Hand: func(m *ice.Message, arg ...string) {
for _, k := range []string{ice.MISC, ice.CORE, ice.BASE} {
m.Cmdy("web.code.publish", ice.CONTEXTS, k)
}
m.EchoScript("shell", "# 共享环境", m.Option(ice.MSG_USERWEB))
m.EchoAnchor(m.Option(ice.MSG_USERWEB)).Echo(ice.NL)
m.EchoQRCode(m.Option(ice.MSG_USERWEB))
}},
tcp.DIAL: {Name: "dial dev=ops name", Help: "连接", Hand: func(m *ice.Message, arg ...string) {
_space_dial(m, m.Option(ice.DEV), kit.Select(ice.Info.NodeName, m.Option(mdb.NAME)), arg...)
}},
DOMAIN: {Name: "domain", Help: "域名", Hand: func(m *ice.Message, arg ...string) {
m.Echo(_space_domain(m))
}},
"hostinfo": {Name: "hostinfo", Help: "域名", Hand: func(m *ice.Message, arg ...string) {
ncpu := 0
nmem := 0.0
m.Cmd("").Tables(func(value ice.Maps) {
if value[mdb.TYPE] == SERVER {
msg := m.Cmd("", value[mdb.NAME], cli.RUNTIME, cli.HOSTINFO)
if msg.Append("nCPU") != "" {
ncpu += kit.Int(msg.Append("nCPU"))
m.Push("nCPU", msg.Append("nCPU"))
m.Push("MemTotal", msg.Append("MemTotal"))
m.Push("name", value[mdb.NAME])
base, raw := 1.0, ""
if strings.HasSuffix(msg.Append("MemTotal"), "M") {
raw = strings.TrimSuffix(msg.Append("MemTotal"), "M")
} else if strings.HasSuffix(msg.Append("MemTotal"), "G") {
base, raw = 1000, strings.TrimSuffix(msg.Append("MemTotal"), "G")
}
n, e := strconv.ParseFloat(raw, 32)
m.Debug("what %v", e)
nmem += n * base
}
}
})
m.StatusTimeCount("nCPU", ncpu, "nmem", kit.Format("%.2fG", nmem/1000.0))
m.Debug("what %v", m.FormatMeta())
}},
}, mdb.HashCloseAction()), Hand: func(m *ice.Message, arg ...string) {
if len(arg) < 2 { // 节点列表
if mdb.HashSelect(m, arg...); len(arg) == 0 {
m.Tables(func(value ice.Maps) {
switch value[mdb.TYPE] {
case MASTER:
m.PushAnchor(value[mdb.NAME], m.CmdAppend(SPIDE, value[mdb.NAME], CLIENT_URL))
default:
m.PushAnchor(value[mdb.NAME], MergePod(m, value[mdb.NAME]))
}
})
m.Sort("type,name,text")
}
return
}
// 下发命令
_space_send(m, strings.ToLower(arg[0]), arg[1:]...)
}},
PP(SPACE): {Name: "/space/ type name share river", Help: "空间站", Hand: func(m *ice.Message, arg ...string) {
_space_fork(m)
}},
}})
}
func Space(m *ice.Message, arg ice.Any) []string {
if arg == nil || arg == "" || kit.Format(arg) == ice.Info.NodeName {
return nil
}
return []string{SPACE, kit.Format(arg)}
}
func call(m *ice.Message, sync bool, cb func(*ice.Message)) {
wait := make(chan bool, 2)
p := kit.Select("10s", m.Option(TIMEOUT))
t := time.AfterFunc(kit.Duration(p), func() {
m.Warn(true, ice.ErrNotValid, m.Detailv())
back(m, nil)
wait <- false
})
m.Optionv("_cb", func(res *ice.Message) {
if cb(res); sync {
wait <- true
t.Stop()
}
})
if sync {
<-wait
} else {
t.Stop()
}
}
func back(m *ice.Message, res *ice.Message) {
switch cb := m.Optionv("_cb").(type) {
case func(*ice.Message):
cb(res)
}
}