diff --git a/base.go b/base.go index 609e48d4..d74ec2a0 100644 --- a/base.go +++ b/base.go @@ -38,7 +38,9 @@ func (f *Frame) Start(m *Message, arg ...string) bool { return true } func (f *Frame) Close(m *Message, arg ...string) bool { - m.target.wg.Wait() + m.TryCatch(m, true, func(m *Message) { + m.target.wg.Wait() + }) list := map[*Context]*Message{m.target: m} m.Travel(func(p *Context, s *Context) { if msg, ok := list[p]; ok && msg != nil { diff --git a/base/nfs/nfs.go b/base/nfs/nfs.go index e50bffd6..3e73c983 100644 --- a/base/nfs/nfs.go +++ b/base/nfs/nfs.go @@ -75,7 +75,7 @@ func dir(m *ice.Message, root string, name string, level int, deep bool, dir_typ m.Push("tree", strings.Repeat("| ", level-1)+"|-"+f.Name()) } case "size": - m.Push("size", f.Size()) + m.Push("size", kit.FmtSize(f.Size())) case "line": if f.IsDir() { if d, e := ioutil.ReadDir(p); m.Assert(e) { diff --git a/base/web/web.go b/base/web/web.go index 2c7ff6a0..8a7185ea 100644 --- a/base/web/web.go +++ b/base/web/web.go @@ -44,15 +44,15 @@ func IsLocalIP(ip string) bool { return ip == "::1" || ip == "127.0.0.1" } func (web *Frame) Login(msg *ice.Message, w http.ResponseWriter, r *http.Request) bool { - if strings.HasPrefix(msg.Option(ice.MSG_USERURL), "/space/") { - return true - } if msg.Options(ice.WEB_SESS) { // 会话认证 sub := msg.Cmd(ice.AAA_SESS, "check", msg.Option(ice.WEB_SESS)) msg.Info("role: %s user: %s", msg.Option(ice.MSG_USERROLE, sub.Append("userrole")), msg.Option(ice.MSG_USERNAME, sub.Append("username"))) } + if strings.HasPrefix(msg.Option(ice.MSG_USERURL), "/space/") { + return true + } if (!msg.Options(ice.MSG_SESSID) || !msg.Options(ice.MSG_USERNAME)) && IsLocalIP(msg.Option(ice.MSG_USERIP)) { // 自动认证 @@ -172,6 +172,7 @@ func (web *Frame) HandleCmd(m *ice.Message, key string, cmd *ice.Command) { msg.Option(ice.MSG_USERUA, r.Header.Get("User-Agent")) msg.Option(ice.MSG_USERIP, r.Header.Get(ice.MSG_USERIP)) msg.Option(ice.MSG_USERURL, r.URL.Path) + msg.Option(ice.MSG_USERNAME, "") msg.Option(ice.WEB_SESS, "") msg.R, msg.W = r, w @@ -350,6 +351,7 @@ var Index = &ice.Context{Name: "web", Help: "网页模块", ice.WEB_SPACE: {Name: "space", Help: "空间站", Value: kit.Data(kit.MDB_SHORT, "name", "redial.a", 3000, "redial.b", 1000, "redial.c", 10, "buffer.r", 4096, "buffer.w", 4096, + "timeout.c", "30s", )}, ice.WEB_DREAM: {Name: "dream", Help: "梦想家", Value: kit.Data("path", "usr/local/work", "cmd", []interface{}{ice.CLI_SYSTEM, "sh", "ice.sh", "start", ice.WEB_SPACE, "connect"}, @@ -387,7 +389,7 @@ var Index = &ice.Context{Name: "web", Help: "网页模块", f.WriteString(kit.Formats(value)) } }) - m.Conf(ice.WEB_CACHE, "hash", kit.Dict()) + // m.Conf(ice.WEB_CACHE, "hash", kit.Dict()) m.Cmd(ice.CTX_CONFIG, "save", "web.json", ice.WEB_SPIDE, ice.WEB_FAVOR, ice.WEB_CACHE, ice.WEB_STORY, ice.WEB_SHARE) }}, @@ -660,7 +662,7 @@ var Index = &ice.Context{Name: "web", Help: "网页模块", // 连接成功 m.Rich(ice.WEB_SPACE, nil, kit.Dict(kit.MDB_TYPE, ice.WEB_MASTER, kit.MDB_NAME, dev)) - m.Info("%d conn %s success %s", i, dev, u) + m.Log(ice.LOG_CMDS, "%d conn %s success %s", i, dev, u) if i = 0; web.HandleWSS(m, true, s, dev) { break } @@ -707,9 +709,14 @@ var Index = &ice.Context{Name: "web", Help: "网页模块", // 下发命令 m.Target().Server().(*Frame).send[id] = m socket.WriteMessage(MSG_MAPS, []byte(m.Format("meta"))) + t := time.AfterFunc(kit.Duration(m.Conf(ice.WEB_SPACE, "meta.timeout.c")), func() { + m.Log(ice.LOG_WARN, "timeout") + m.Back(nil) + }) m.Call(true, func(msg *ice.Message) *ice.Message { // 返回结果 m.Copy(msg).Log("cost", "%s: %s %v", m.Format("cost"), arg[0], arg[1:]) + t.Stop() return nil }) } @@ -1001,46 +1008,181 @@ var Index = &ice.Context{Name: "web", Help: "网页模块", // head list data time text file switch arg[0] { + case "pull": + + // 起止节点 + begin, end := arg[2], "" + m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, val map[string]interface{}) { + end = kit.Format(kit.Value(val, kit.Keys("remote", arg[1], "list"))) + }) + + prev, pull := "", end + var first map[string]interface{} + for begin != end { + msg := m.Cmd(ice.WEB_SPIDE, arg[1], "/space/pull/"+begin+"/"+end).Table(func(index int, value map[string]string, head []string) { + if m.Confm(ice.WEB_CACHE, kit.Keys("hash", value["data"])) == nil { + // 导入缓存 + m.Log(ice.LOG_IMPORT, "%v: %v", value["data"], value["save"]) + if node := kit.UnMarshal(value["save"]); kit.Format(kit.Value(node, "file")) != "" { + m.Cmd(ice.WEB_SPIDE, arg[1], "cache", "/space/download/"+kit.Format(kit.Value(value, "data"))) + } else { + m.Conf(ice.WEB_CACHE, kit.Keys("hash", value["data"]), node) + } + } + + node := kit.UnMarshal(value["node"]).(map[string]interface{}) + if m.Confm(ice.WEB_STORY, kit.Keys("hash", value["list"])) == nil { + // 导入节点 + m.Log(ice.LOG_IMPORT, "%v: %v", value["list"], value["node"]) + m.Conf(ice.WEB_STORY, kit.Keys("hash", value["list"]), node) + } + + if first == nil { + if m.Richs(ice.WEB_STORY, "head", node["story"], nil) == nil { + // 自动创建 + h := m.Rich(ice.WEB_STORY, "head", kit.Dict( + "count", node["count"], "scene", node["scene"], "story", node["story"], "list", value["list"], + )) + m.Log(ice.LOG_CREATE, "%v: %v", h, node["story"]) + } + + pull, first = kit.Format(value["list"]), node + m.Richs(ice.WEB_STORY, "head", node["story"], func(key string, val map[string]interface{}) { + // 更新分支 + prev = kit.Format(val["list"]) + m.Log(ice.LOG_IMPORT, "%v: %v", pull, arg[1]) + kit.Value(val, kit.Keys("remote", arg[1]), kit.Dict( + "list", pull, "time", node["time"], "count", node["count"], + )) + }) + } + + if prev == kit.Format(node["prev"]) || prev == kit.Format(node["pull"]) { + // 快速合并 + m.Log(ice.LOG_IMPORT, "%v: %v", pull, arg[2]) + m.Richs(ice.WEB_STORY, "head", node["story"], func(key string, val map[string]interface{}) { + val["count"] = first["count"] + val["time"] = first["time"] + val["list"] = pull + }) + prev = pull + } + + begin = kit.Format(node["prev"]) + }) + if msg.Append("list") == "" { + break + } + } + if prev != pull { + m.Log(ice.LOG_WARN, "unmerge %s", pull) + m.Echo("unmerge %s", pull) + } + + case "push": + m.Cmdx(ice.WEB_STORY, "pull", arg[1:]) + + // 查询索引 + push, list := "", m.Cmd(ice.WEB_STORY, "index", arg[2]).Append("list") + m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, val map[string]interface{}) { + push = kit.Format(kit.Value(val, kit.Keys("remote", arg[1], "list"))) + }) + + // 查询节点 + nodes := []string{} + for list != "" && list != push { + m.Confm(ice.WEB_STORY, kit.Keys("hash", list), func(value map[string]interface{}) { + nodes, list = append(nodes, list), kit.Format(value["prev"]) + }) + } + + for _, v := range kit.Revert(nodes) { + node := m.Confm(ice.WEB_STORY, kit.Keys("hash", v)) + save := m.Confm(ice.WEB_CACHE, kit.Keys("hash", node["data"])) + + // 推送节点 + m.Log(ice.LOG_EXPORT, "%s: %s", v, node) + m.Cmd(ice.WEB_SPIDE, arg[1], "/space/push", + "list", v, "node", kit.Format(node), + "data", node["data"], "save", kit.Format(save), + ) + if kit.Format(save["file"]) != "" { + // 推送缓存 + m.Cmd(ice.WEB_SPIDE, arg[1], "/space/upload", + "part", "upload", "@"+kit.Format(save["file"]), + ) + } + } + + m.Cmd(ice.WEB_STORY, "pull", arg[1:]) + + case "watch": + msg := m.Cmd(ice.WEB_STORY, "index", arg[1]) + name := kit.Select(arg[1], arg, 2) + os.Rename(name, kit.Keys(name, "bak")) + os.Link(msg.Append("file"), name) + m.Log(ice.LOG_EXPORT, "%s: %s", msg.Append("link"), name) + case "catch": if last := m.Richs(ice.WEB_STORY, "head", arg[2], nil); last != nil { if t, e := time.ParseInLocation(ice.ICE_TIME, kit.Format(last["time"]), time.Local); e == nil { if s, e := os.Stat(arg[2]); e == nil && s.ModTime().Before(t) { m.Info("%s last: %s", arg[2], kit.Format(t)) + m.Echo("%s", last["list"]) break } } } fallthrough - case "add", "upload": + case "add", "upload", "merge": // 保存数据 - if m.Richs(ice.WEB_CACHE, nil, kit.Select("", arg, 3), nil) == nil { + if arg[0] != "merge" && m.Richs(ice.WEB_CACHE, nil, kit.Select("", arg, 3), nil) == nil { m.Cmdy(ice.WEB_CACHE, arg) arg = []string{arg[0], m.Append("type"), m.Append("name"), m.Append("data")} } // 查询索引 - head, prev, count := "", "", 0 - m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, value map[string]interface{}) { - head, prev, count = key, kit.Format(value["list"]), kit.Int(value["count"]) + head, prev, value, count := "", "", map[string]interface{}{}, 0 + m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, val map[string]interface{}) { + head, prev, value, count = key, kit.Format(val["list"]), val, kit.Int(val["count"]) m.Log("info", "head: %v prev: %v count: %v", head, prev, count) }) - if last := m.Richs(ice.WEB_STORY, nil, prev, nil); prev != "" && last != nil && last["data"] == arg[3] { + pull := "" + if arg[0] == "merge" { + m.Richs(ice.WEB_STORY, "head", arg[2], func(key string, value map[string]interface{}) { + pull = kit.Format(kit.Value(value, kit.Keys("remote", arg[3], "list"))) + }) + if pull != "" { + m.Conf(ice.WEB_STORY, kit.Keys("head", prev, "data")) + arg[3] = m.Conf(ice.WEB_STORY, kit.Keys("hash", prev, "data")) + m.Log("merge %s <- %s", arg[3], pull) + } else { + break + } + } else if last := m.Richs(ice.WEB_STORY, nil, prev, nil); prev != "" && last != nil && last["data"] == arg[3] { // 重复提交 break } // 添加节点 list := m.Rich(ice.WEB_STORY, nil, kit.Dict( - "count", count+1, "scene", arg[1], "story", arg[2], "data", arg[3], "prev", prev, + "count", count+1, "scene", arg[1], "story", arg[2], "data", arg[3], "prev", prev, "pull", pull, )) m.Log(ice.LOG_CREATE, "story: %s %s: %s", list, arg[1], arg[2]) m.Push("list", list) - // 添加索引 - m.Rich(ice.WEB_STORY, "head", kit.Dict( - "count", count+1, "scene", arg[1], "story", arg[2], "list", list, - )) + if head == "" { + // 添加索引 + m.Rich(ice.WEB_STORY, "head", kit.Dict( + "count", count+1, "scene", arg[1], "story", arg[2], "list", list, + )) + } else { + // 更新索引 + value["count"] = count + 1 + value["time"] = m.Time() + value["list"] = list + } m.Echo(list) @@ -1243,9 +1385,54 @@ var Index = &ice.Context{Name: "web", Help: "网页模块", list := strings.Split(cmd, "/") switch list[2] { + case "login": + m.Option(ice.MSG_SESSID, Cookie(m, m.Cmdx(ice.AAA_USER, "login", m.Option("username"), m.Option("password")))) + return case "share": m.Cmdy(ice.WEB_SHARE, list[3:]) return + case "pull": + node := m.Cmd(ice.WEB_STORY, "index", list[3]).Append("list") + for i := 0; i < 10 && node != "" && node != list[4]; i++ { + m.Confm(ice.WEB_STORY, kit.Keys("hash", node), func(value map[string]interface{}) { + m.Push("list", node) + m.Push("node", kit.Format(value)) + m.Push("data", value["data"]) + m.Push("save", kit.Format(m.Confm(ice.WEB_CACHE, kit.Keys("hash", value["data"])))) + node = kit.Format(value["prev"]) + }) + } + return + case "push": + if m.Confm(ice.WEB_CACHE, kit.Keys("hash", m.Option("data"))) == nil { + // 导入缓存 + m.Log(ice.LOG_IMPORT, "%v: %v", m.Option("data"), m.Option("save")) + node := kit.UnMarshal(m.Option("save")) + m.Conf(ice.WEB_CACHE, kit.Keys("hash", m.Option("data")), node) + } + + node := kit.UnMarshal(m.Option("node")).(map[string]interface{}) + if m.Confm(ice.WEB_STORY, kit.Keys("hash", m.Option("list"))) == nil { + // 导入节点 + m.Log(ice.LOG_IMPORT, "%v: %v", m.Option("list"), m.Option("node")) + m.Conf(ice.WEB_STORY, kit.Keys("hash", m.Option("list")), node) + } + + if head := m.Richs(ice.WEB_STORY, "head", node["story"], nil); head == nil { + // 自动创建 + h := m.Rich(ice.WEB_STORY, "head", kit.Dict( + "count", node["count"], "scene", node["scene"], "story", node["story"], "list", m.Option("list"), + )) + m.Log(ice.LOG_CREATE, "%v: %v", h, node["story"]) + } else if head["list"] == kit.Format(node["prev"]) || head["list"] == kit.Format(node["pull"]) { + // 更新索引 + head["list"] = m.Option("list") + head["count"] = node["count"] + head["time"] = node["time"] + } else { + // 推送失败 + } + return case "upload": m.Cmdy(ice.WEB_CACHE, "upload") return diff --git a/core/code/code.go b/core/code/code.go index 0620123a..a5e917df 100644 --- a/core/code/code.go +++ b/core/code/code.go @@ -57,7 +57,7 @@ var Index = &ice.Context{Name: "code", Help: "编程模块", "compile": {Name: "compile", Help: "编译", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { if len(arg) == 0 { // 目录列表 - m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path")) + m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path"), "time size path") return } @@ -80,7 +80,7 @@ var Index = &ice.Context{Name: "code", Help: "编程模块", "publish": {Name: "publish", Help: "发布", Hand: func(m *ice.Message, c *ice.Context, cmd string, arg ...string) { if len(arg) == 0 { // 目录列表 - m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path")) + m.Cmdy("nfs.dir", "", m.Conf("publish", "meta.path"), "time size path") return } diff --git a/miss/miss.md b/miss/miss.md index 4d172f0c..19e44891 100644 --- a/miss/miss.md +++ b/miss/miss.md @@ -14,10 +14,18 @@ cli模块用于与系统进行交互。 - 系统信息 ice.CLI_RUNTIME - 系统命令 ice.CLI_SYSTEM +## 文件模块 base/nfs + +nfs模块用于管理文件的读写。 + ## 终端模块 base/ssh ssh模块用于与终端交互。 +## 数据模块 base/mdb + +mdb模块用于管理数据的读写。 + ## 日志模块 base/log log模块负责输出日志。 diff --git a/type.go b/type.go index df068ec5..c75b5502 100644 --- a/type.go +++ b/type.go @@ -339,6 +339,9 @@ func (m *Message) Set(key string, arg ...string) *Message { return m.Add(key, arg...) } func (m *Message) Copy(msg *Message) *Message { + if msg == nil { + return m + } for _, k := range msg.meta[MSG_APPEND] { if kit.IndexOf(m.meta[MSG_APPEND], k) == -1 { m.meta[MSG_APPEND] = append(m.meta[MSG_APPEND], k)