1
0
mirror of https://shylinux.com/x/icebergs synced 2025-04-25 17:18:05 +08:00

add push&pull

This commit is contained in:
shaoying 2019-12-31 22:37:55 +08:00
parent 032bbe81d4
commit a0586fce14
6 changed files with 220 additions and 20 deletions

View File

@ -38,7 +38,9 @@ func (f *Frame) Start(m *Message, arg ...string) bool {
return true
}
func (f *Frame) Close(m *Message, arg ...string) bool {
m.TryCatch(m, true, func(m *Message) {
m.target.wg.Wait()
})
list := map[*Context]*Message{m.target: m}
m.Travel(func(p *Context, s *Context) {
if msg, ok := list[p]; ok && msg != nil {

View File

@ -75,7 +75,7 @@ func dir(m *ice.Message, root string, name string, level int, deep bool, dir_typ
m.Push("tree", strings.Repeat("| ", level-1)+"|-"+f.Name())
}
case "size":
m.Push("size", f.Size())
m.Push("size", kit.FmtSize(f.Size()))
case "line":
if f.IsDir() {
if d, e := ioutil.ReadDir(p); m.Assert(e) {

View File

@ -44,15 +44,15 @@ func IsLocalIP(ip string) bool {
return ip == "::1" || ip == "127.0.0.1"
}
func (web *Frame) Login(msg *ice.Message, w http.ResponseWriter, r *http.Request) bool {
if strings.HasPrefix(msg.Option(ice.MSG_USERURL), "/space/") {
return true
}
if msg.Options(ice.WEB_SESS) {
// 会话认证
sub := msg.Cmd(ice.AAA_SESS, "check", msg.Option(ice.WEB_SESS))
msg.Info("role: %s user: %s", msg.Option(ice.MSG_USERROLE, sub.Append("userrole")),
msg.Option(ice.MSG_USERNAME, sub.Append("username")))
}
if strings.HasPrefix(msg.Option(ice.MSG_USERURL), "/space/") {
return true
}
if (!msg.Options(ice.MSG_SESSID) || !msg.Options(ice.MSG_USERNAME)) && IsLocalIP(msg.Option(ice.MSG_USERIP)) {
// 自动认证
@ -172,6 +172,7 @@ func (web *Frame) HandleCmd(m *ice.Message, key string, cmd *ice.Command) {
msg.Option(ice.MSG_USERUA, r.Header.Get("User-Agent"))
msg.Option(ice.MSG_USERIP, r.Header.Get(ice.MSG_USERIP))
msg.Option(ice.MSG_USERURL, r.URL.Path)
msg.Option(ice.MSG_USERNAME, "")
msg.Option(ice.WEB_SESS, "")
msg.R, msg.W = r, w
@ -350,6 +351,7 @@ var Index = &ice.Context{Name: "web", Help: "网页模块",
ice.WEB_SPACE: {Name: "space", Help: "空间站", Value: kit.Data(kit.MDB_SHORT, "name",
"redial.a", 3000, "redial.b", 1000, "redial.c", 10,
"buffer.r", 4096, "buffer.w", 4096,
"timeout.c", "30s",
)},
ice.WEB_DREAM: {Name: "dream", Help: "梦想家", Value: kit.Data("path", "usr/local/work",
"cmd", []interface{}{ice.CLI_SYSTEM, "sh", "ice.sh", "start", ice.WEB_SPACE, "connect"},
@ -387,7 +389,7 @@ var Index = &ice.Context{Name: "web", Help: "网页模块",
f.WriteString(kit.Formats(value))
}
})
m.Conf(ice.WEB_CACHE, "hash", kit.Dict())
// m.Conf(ice.WEB_CACHE, "hash", kit.Dict())
m.Cmd(ice.CTX_CONFIG, "save", "web.json", ice.WEB_SPIDE, ice.WEB_FAVOR, ice.WEB_CACHE, ice.WEB_STORY, ice.WEB_SHARE)
}},
@ -660,7 +662,7 @@ var Index = &ice.Context{Name: "web", Help: "网页模块",
// 连接成功
m.Rich(ice.WEB_SPACE, nil, kit.Dict(kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev))
m.Info("%d conn %s success %s", i, dev, u)
m.Log(ice.LOG_CMDS, "%d conn %s success %s", i, dev, u)
if i = 0; web.HandleWSS(m, true, s, dev) {
break
}
@ -707,9 +709,14 @@ var Index = &ice.Context{Name: "web", Help: "网页模块",
// 下发命令
m.Target().Server().(*Frame).send[id] = m
socket.WriteMessage(MSG_MAPS, []byte(m.Format("meta")))
t := time.AfterFunc(kit.Duration(m.Conf(ice.WEB_SPACE, "meta.timeout.c")), func() {
m.Log(ice.LOG_WARN, "timeout")
m.Back(nil)
})
m.Call(true, func(msg *ice.Message) *ice.Message {
// 返回结果
m.Copy(msg).Log("cost", "%s: %s %v", m.Format("cost"), arg[0], arg[1:])
t.Stop()
return nil
})
}
@ -1001,46 +1008,181 @@ var Index = &ice.Context{Name: "web", Help: "网页模块",
// head list data time text file
switch arg[0] {
case "pull":
// 起止节点
begin, end := arg[2], ""
m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, val map[string]interface{}) {
end = kit.Format(kit.Value(val, kit.Keys("remote", arg[1], "list")))
})
prev, pull := "", end
var first map[string]interface{}
for begin != end {
msg := m.Cmd(ice.WEB_SPIDE, arg[1], "/space/pull/"+begin+"/"+end).Table(func(index int, value map[string]string, head []string) {
if m.Confm(ice.WEB_CACHE, kit.Keys("hash", value["data"])) == nil {
// 导入缓存
m.Log(ice.LOG_IMPORT, "%v: %v", value["data"], value["save"])
if node := kit.UnMarshal(value["save"]); kit.Format(kit.Value(node, "file")) != "" {
m.Cmd(ice.WEB_SPIDE, arg[1], "cache", "/space/download/"+kit.Format(kit.Value(value, "data")))
} else {
m.Conf(ice.WEB_CACHE, kit.Keys("hash", value["data"]), node)
}
}
node := kit.UnMarshal(value["node"]).(map[string]interface{})
if m.Confm(ice.WEB_STORY, kit.Keys("hash", value["list"])) == nil {
// 导入节点
m.Log(ice.LOG_IMPORT, "%v: %v", value["list"], value["node"])
m.Conf(ice.WEB_STORY, kit.Keys("hash", value["list"]), node)
}
if first == nil {
if m.Richs(ice.WEB_STORY, "head", node["story"], nil) == nil {
// 自动创建
h := m.Rich(ice.WEB_STORY, "head", kit.Dict(
"count", node["count"], "scene", node["scene"], "story", node["story"], "list", value["list"],
))
m.Log(ice.LOG_CREATE, "%v: %v", h, node["story"])
}
pull, first = kit.Format(value["list"]), node
m.Richs(ice.WEB_STORY, "head", node["story"], func(key string, val map[string]interface{}) {
// 更新分支
prev = kit.Format(val["list"])
m.Log(ice.LOG_IMPORT, "%v: %v", pull, arg[1])
kit.Value(val, kit.Keys("remote", arg[1]), kit.Dict(
"list", pull, "time", node["time"], "count", node["count"],
))
})
}
if prev == kit.Format(node["prev"]) || prev == kit.Format(node["pull"]) {
// 快速合并
m.Log(ice.LOG_IMPORT, "%v: %v", pull, arg[2])
m.Richs(ice.WEB_STORY, "head", node["story"], func(key string, val map[string]interface{}) {
val["count"] = first["count"]
val["time"] = first["time"]
val["list"] = pull
})
prev = pull
}
begin = kit.Format(node["prev"])
})
if msg.Append("list") == "" {
break
}
}
if prev != pull {
m.Log(ice.LOG_WARN, "unmerge %s", pull)
m.Echo("unmerge %s", pull)
}
case "push":
m.Cmdx(ice.WEB_STORY, "pull", arg[1:])
// 查询索引
push, list := "", m.Cmd(ice.WEB_STORY, "index", arg[2]).Append("list")
m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, val map[string]interface{}) {
push = kit.Format(kit.Value(val, kit.Keys("remote", arg[1], "list")))
})
// 查询节点
nodes := []string{}
for list != "" && list != push {
m.Confm(ice.WEB_STORY, kit.Keys("hash", list), func(value map[string]interface{}) {
nodes, list = append(nodes, list), kit.Format(value["prev"])
})
}
for _, v := range kit.Revert(nodes) {
node := m.Confm(ice.WEB_STORY, kit.Keys("hash", v))
save := m.Confm(ice.WEB_CACHE, kit.Keys("hash", node["data"]))
// 推送节点
m.Log(ice.LOG_EXPORT, "%s: %s", v, node)
m.Cmd(ice.WEB_SPIDE, arg[1], "/space/push",
"list", v, "node", kit.Format(node),
"data", node["data"], "save", kit.Format(save),
)
if kit.Format(save["file"]) != "" {
// 推送缓存
m.Cmd(ice.WEB_SPIDE, arg[1], "/space/upload",
"part", "upload", "@"+kit.Format(save["file"]),
)
}
}
m.Cmd(ice.WEB_STORY, "pull", arg[1:])
case "watch":
msg := m.Cmd(ice.WEB_STORY, "index", arg[1])
name := kit.Select(arg[1], arg, 2)
os.Rename(name, kit.Keys(name, "bak"))
os.Link(msg.Append("file"), name)
m.Log(ice.LOG_EXPORT, "%s: %s", msg.Append("link"), name)
case "catch":
if last := m.Richs(ice.WEB_STORY, "head", arg[2], nil); last != nil {
if t, e := time.ParseInLocation(ice.ICE_TIME, kit.Format(last["time"]), time.Local); e == nil {
if s, e := os.Stat(arg[2]); e == nil && s.ModTime().Before(t) {
m.Info("%s last: %s", arg[2], kit.Format(t))
m.Echo("%s", last["list"])
break
}
}
}
fallthrough
case "add", "upload":
case "add", "upload", "merge":
// 保存数据
if m.Richs(ice.WEB_CACHE, nil, kit.Select("", arg, 3), nil) == nil {
if arg[0] != "merge" && m.Richs(ice.WEB_CACHE, nil, kit.Select("", arg, 3), nil) == nil {
m.Cmdy(ice.WEB_CACHE, arg)
arg = []string{arg[0], m.Append("type"), m.Append("name"), m.Append("data")}
}
// 查询索引
head, prev, count := "", "", 0
m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, value map[string]interface{}) {
head, prev, count = key, kit.Format(value["list"]), kit.Int(value["count"])
head, prev, value, count := "", "", map[string]interface{}{}, 0
m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, val map[string]interface{}) {
head, prev, value, count = key, kit.Format(val["list"]), val, kit.Int(val["count"])
m.Log("info", "head: %v prev: %v count: %v", head, prev, count)
})
if last := m.Richs(ice.WEB_STORY, nil, prev, nil); prev != "" && last != nil && last["data"] == arg[3] {
pull := ""
if arg[0] == "merge" {
m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, value map[string]interface{}) {
pull = kit.Format(kit.Value(value, kit.Keys("remote", arg[3], "list")))
})
if pull != "" {
m.Conf(ice.WEB_STORY, kit.Keys("head", prev, "data"))
arg[3] = m.Conf(ice.WEB_STORY, kit.Keys("hash", prev, "data"))
m.Log("merge %s <- %s", arg[3], pull)
} else {
break
}
} else if last := m.Richs(ice.WEB_STORY, nil, prev, nil); prev != "" && last != nil && last["data"] == arg[3] {
// 重复提交
break
}
// 添加节点
list := m.Rich(ice.WEB_STORY, nil, kit.Dict(
"count", count+1, "scene", arg[1], "story", arg[2], "data", arg[3], "prev", prev,
"count", count+1, "scene", arg[1], "story", arg[2], "data", arg[3], "prev", prev, "pull", pull,
))
m.Log(ice.LOG_CREATE, "story: %s %s: %s", list, arg[1], arg[2])
m.Push("list", list)
if head == "" {
// 添加索引
m.Rich(ice.WEB_STORY, "head", kit.Dict(
"count", count+1, "scene", arg[1], "story", arg[2], "list", list,
))
} else {
// 更新索引
value["count"] = count + 1
value["time"] = m.Time()
value["list"] = list
}
m.Echo(list)
@ -1243,9 +1385,54 @@ var Index = &ice.Context{Name: "web", Help: "网页模块",
list := strings.Split(cmd, "/")
switch list[2] {
case "login":
m.Option(ice.MSG_SESSID, Cookie(m, m.Cmdx(ice.AAA_USER, "login", m.Option("username"), m.Option("password"))))
return
case "share":
m.Cmdy(ice.WEB_SHARE, list[3:])
return
case "pull":
node := m.Cmd(ice.WEB_STORY, "index", list[3]).Append("list")
for i := 0; i < 10 && node != "" && node != list[4]; i++ {
m.Confm(ice.WEB_STORY, kit.Keys("hash", node), func(value map[string]interface{}) {
m.Push("list", node)
m.Push("node", kit.Format(value))
m.Push("data", value["data"])
m.Push("save", kit.Format(m.Confm(ice.WEB_CACHE, kit.Keys("hash", value["data"]))))
node = kit.Format(value["prev"])
})
}
return
case "push":
if m.Confm(ice.WEB_CACHE, kit.Keys("hash", m.Option("data"))) == nil {
// 导入缓存
m.Log(ice.LOG_IMPORT, "%v: %v", m.Option("data"), m.Option("save"))
node := kit.UnMarshal(m.Option("save"))
m.Conf(ice.WEB_CACHE, kit.Keys("hash", m.Option("data")), node)
}
node := kit.UnMarshal(m.Option("node")).(map[string]interface{})
if m.Confm(ice.WEB_STORY, kit.Keys("hash", m.Option("list"))) == nil {
// 导入节点
m.Log(ice.LOG_IMPORT, "%v: %v", m.Option("list"), m.Option("node"))
m.Conf(ice.WEB_STORY, kit.Keys("hash", m.Option("list")), node)
}
if head := m.Richs(ice.WEB_STORY, "head", node["story"], nil); head == nil {
// 自动创建
h := m.Rich(ice.WEB_STORY, "head", kit.Dict(
"count", node["count"], "scene", node["scene"], "story", node["story"], "list", m.Option("list"),
))
m.Log(ice.LOG_CREATE, "%v: %v", h, node["story"])
} else if head["list"] == kit.Format(node["prev"]) || head["list"] == kit.Format(node["pull"]) {
// 更新索引
head["list"] = m.Option("list")
head["count"] = node["count"]
head["time"] = node["time"]
} else {
// 推送失败
}
return
case "upload":
m.Cmdy(ice.WEB_CACHE, "upload")
return

View File

@ -57,7 +57,7 @@ var Index = &ice.Context{Name: "code", Help: "编程模块",
"compile": {Name: "compile", Help: "编译", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) {
if len(arg) == 0 {
// 目录列表
m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path"))
m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path"), "time size path")
return
}
@ -80,7 +80,7 @@ var Index = &ice.Context{Name: "code", Help: "编程模块",
"publish": {Name: "publish", Help: "发布", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) {
if len(arg) == 0 {
// 目录列表
m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path"))
m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path"), "time size path")
return
}

View File

@ -14,10 +14,18 @@ cli模块用于与系统进行交互。
- 系统信息 ice.CLI_RUNTIME
- 系统命令 ice.CLI_SYSTEM
## 文件模块 base/nfs
nfs模块用于管理文件的读写。
## 终端模块 base/ssh
ssh模块用于与终端交互。
## 数据模块 base/mdb
mdb模块用于管理数据的读写。
## 日志模块 base/log
log模块负责输出日志。

View File

@ -339,6 +339,9 @@ func (m *Message) Set(key string, arg ...string) *Message {
return m.Add(key, arg...)
}
func (m *Message) Copy(msg *Message) *Message {
if msg == nil {
return m
}
for _, k := range msg.meta[MSG_APPEND] {
if kit.IndexOf(m.meta[MSG_APPEND], k) == -1 {
m.meta[MSG_APPEND] = append(m.meta[MSG_APPEND], k)