1
0
mirror of https://shylinux.com/x/ContextOS synced 2025-04-26 09:14:06 +08:00

mac add nfs.send

This commit is contained in:
shaoying 2018-08-02 00:19:39 +08:00
parent 2306318835
commit 8059414c74
3 changed files with 98 additions and 246 deletions

View File

@ -280,6 +280,8 @@ type Message struct {
messages []*Message
message *Message
root *Message
Remote chan bool
}
func (m *Message) Code() int { // {{{
@ -654,20 +656,14 @@ func (m *Message) CallBack(sync bool, cb func(msg *Message) (sub *Message), arg
return m.Call(cb, arg...)
}
wait := make(chan bool)
wait := make(chan *Message)
go m.Call(func(sub *Message) *Message {
msg := cb(sub)
m.Log("lock", "before done %v", arg)
wait <- true
m.Log("lock", "after done %v", arg)
wait <- m
return msg
}, arg...)
m.Log("lock", "before wait %v", arg)
<-wait
m.Log("lock", "after wait %v", arg)
return m
return <-wait
}
// }}}
@ -2887,10 +2883,12 @@ func Start(args ...string) {
m.target.Begin(m)
}
Pulse.Sess("tcp", "tcp")
Pulse.Sess("nfs", "nfs")
Pulse.Sess("lex", "lex")
Pulse.Sess("yac", "yac")
Pulse.Sess("cli", "cli")
Pulse.Sess("aaa", "aaa")
Pulse.Sess("log", "log")

View File

@ -37,9 +37,12 @@ type NFS struct {
paths []string
io net.Conn
send chan *ctx.Message
recv chan *ctx.Message
hand map[int]*ctx.Message
*bufio.Reader
*bufio.Writer
send map[int]*ctx.Message
target *ctx.Context
cli *ctx.Message
@ -546,12 +549,10 @@ func (nfs *NFS) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server
c.Configs = map[string]*ctx.Config{}
} else {
c.Caches = map[string]*ctx.Cache{
"nbytes": &ctx.Cache{Name: "消息发送字节", Value: "0", Help: "消息发送字节"},
"nsend": &ctx.Cache{Name: "消息发送数量", Value: "0", Help: "消息发送数量"},
"nrecv": &ctx.Cache{Name: "消息接收数量", Value: "0", Help: "消息接收数量"},
"target": &ctx.Cache{Name: "消息接收模块", Value: "ssh", Help: "消息接收模块"},
"result": &ctx.Cache{Name: "前一条指令执行结果", Value: "", Help: "前一条指令执行结果"},
"sessid": &ctx.Cache{Name: "会话令牌", Value: "", Help: "会话令牌"},
"nread": &ctx.Cache{Name: "nread", Value: "0", Help: "nread"},
"nwrite": &ctx.Cache{Name: "nwrite", Value: "0", Help: "nwrite"},
}
c.Configs = map[string]*ctx.Config{}
}
@ -673,143 +674,97 @@ func (nfs *NFS) Start(m *ctx.Message, arg ...string) bool { // {{{
return false
}
m.Cap("stream", m.Option("stream"))
nfs.io = m.Optionv("io").(net.Conn)
bio := bufio.NewScanner(nfs.io)
for bio.Scan() {
m.Log("info", "recv: %s", bio.Text())
}
return false
m.Sess("nfs", m)
nfs.Message = m
if _, ok := m.Data["io"]; ok {
m.Cap("stream", m.Source().Name)
// m.Sess("aaa", "aaa").Cmd("login", "demo", "demo")
m.Options("stdio", false)
// nfs.io = socket.(io.ReadWriteCloser)
nfs.Reader = bufio.NewReader(nfs.io)
nfs.Writer = bufio.NewWriter(nfs.io)
nfs.send = make(map[int]*ctx.Message)
nfs.target = m.Target()
if target, ok := m.Data["target"]; ok {
nfs.target = target.(*ctx.Context)
}
var msg *ctx.Message
nfs.Caches["target"] = &ctx.Cache{Name: "target", Value: "", Help: "文件名"}
nsend := ""
nfs.hand = map[int]*ctx.Message{}
nfs.send = make(chan *ctx.Message, 10)
nfs.recv = make(chan *ctx.Message, 10)
go func() {
for {
line, e := nfs.Reader.ReadString('\n')
if msg == nil {
msg = m.Sess("ssh")
m.Cap("target", msg.Target().Name)
}
if e == io.EOF {
msg.Cmd("close")
}
m.Assert(e)
if line = strings.TrimSpace(line); len(line) > 0 {
ls := strings.SplitN(line, ":", 2)
ls[0] = strings.TrimSpace(ls[0])
ls[1], e = url.QueryUnescape(strings.TrimSpace(ls[1]))
m.Assert(e)
switch ls[0] {
case "detail":
msg.Add("detail", ls[1])
case "result":
msg.Add("result", ls[1])
case "nsend":
nsend = ls[1]
default:
msg.Add("option", ls[0], ls[1])
select {
case msg := <-nfs.send:
head, body := "detail", "option"
if msg.Hand {
head, body = "result", "append"
send_code := msg.Option("send_code")
msg.Append("send_code", send_code)
m.Log("info", "%s recv: %v %v", msg.Option("recv_code"), msg.Meta[head], msg.Meta[body])
} else {
m.Log("info", "%d send: %v %v", m.Capi("nsend", 1), msg.Meta[head], msg.Meta[body])
msg.Meta["detail"] = msg.Meta["detail"][1:]
nfs.hand[m.Capi("nsend")] = msg
msg.Option("send_code", m.Capi("nsend"))
}
continue
}
if msg.Has("detail") {
msg.Log("info", "%d recv", m.Capi("nrecv", 1))
msg.Log("info", "detail: %v", msg.Meta["detail"])
msg.Log("info", "option: %v", msg.Meta["option"])
msg.Options("stdio", false)
msg.Option("nsend", nsend)
func() {
cmd := msg
nsends := nsend
cmd.Call(func(sub *ctx.Message) *ctx.Message {
for _, v := range sub.Meta["result"] {
_, e := fmt.Fprintf(nfs.Writer, "result: %s\n", url.QueryEscape(v))
sub.Assert(e)
}
sub.Append("nsend", nsends)
for _, k := range sub.Meta["append"] {
for _, v := range sub.Meta[k] {
_, e := fmt.Fprintf(nfs.Writer, "%s: %s\n", k, url.QueryEscape(v))
sub.Assert(e)
}
}
sub.Log("info", "%d recv", sub.Optioni("nsend"))
sub.Log("info", "result: %v", sub.Meta["result"])
sub.Log("info", "append: %v", sub.Meta["append"])
_, e := fmt.Fprintf(nfs.Writer, "\n")
sub.Assert(e)
e = nfs.Writer.Flush()
sub.Assert(e)
if sub.Has("io") {
if f, ok := sub.Data["io"].(io.ReadCloser); ok {
io.Copy(nfs.Writer, f)
nfs.Writer.Flush()
f.Close()
}
}
return sub
})
}()
} else {
msg.Meta["append"] = msg.Meta["option"]
delete(msg.Meta, "option")
msg.Log("info", "%s send", nsend)
msg.Log("info", "result: %v", msg.Meta["result"])
msg.Log("info", "append: %v", msg.Meta["append"])
n, e := strconv.Atoi(nsend)
m.Assert(e)
send := nfs.send[n]
send.Copy(msg, "result")
send.Copy(msg, "append")
if send.Has("io") {
if f, ok := send.Data["io"].(io.WriteCloser); ok {
io.CopyN(f, nfs.Reader, int64(send.Appendi("size")))
f.Close()
for _, v := range msg.Meta[head] {
n, e := fmt.Fprintf(nfs.io, "%s: %s\n", head, url.QueryEscape(v))
m.Assert(e)
m.Capi("nwrite", n)
}
for _, k := range msg.Meta[body] {
for _, v := range msg.Meta[k] {
n, e := fmt.Fprintf(nfs.io, "%s: %s\n", url.QueryEscape(k), url.QueryEscape(v))
m.Assert(e)
m.Capi("nwrite", n)
}
}
send.Back(send)
n, e := fmt.Fprintf(nfs.io, "\n")
m.Assert(e)
m.Capi("nwrite", n)
}
}
}()
go func() {
bio := bufio.NewScanner(nfs.io)
var e error
for msg, head, body := m.Spawn(), "", ""; bio.Scan(); {
line := bio.Text()
m.Capi("nread", len(line)+1)
if len(line) == 0 {
if head == "detail" {
m.Log("info", "%d recv: %v %v", m.Capi("nrecv", 1), msg.Meta[head], msg.Meta[body])
msg.Option("recv_code", m.Cap("nrecv"))
nfs.recv <- msg
} else {
m.Log("info", "%d send: %v %v", msg.Appendi("send_code"), msg.Meta[head], msg.Meta[body])
h := nfs.hand[msg.Appendi("send_code")]
h.Copy(msg, "result").Copy(msg, "append")
h.Remote <- true
}
msg = m.Spawn()
continue
}
nsend = ""
msg = nil
word := strings.Split(line, ": ")
word[0], e = url.QueryUnescape(word[0])
m.Assert(e)
word[1], e = url.QueryUnescape(word[1])
m.Assert(e)
switch word[0] {
case "detail":
head, body = "detail", "option"
msg.Add(word[0], word[1])
case "result":
head, body = "result", "append"
msg.Add(word[0], word[1])
default:
msg.Add(body, word[0], word[1])
}
}
}()
for {
select {
case msg := <-nfs.recv:
nfs.send <- msg.Cmd()
}
return true
}
return false
return true
}
// }}}
@ -1319,7 +1274,7 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心",
"listen": &ctx.Command{Name: "listen args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
if _, ok := m.Target().Server.(*NFS); m.Assert(ok) { //{{{
m.Find("tcp").Call(func(com *ctx.Message) *ctx.Message {
m.Sess("tcp").Call(func(com *ctx.Message) *ctx.Message {
sub := com.Spawn(c)
sub.Start(fmt.Sprintf("file%d", m.Capi("nfile", 1)), "远程文件")
return sub
@ -1329,7 +1284,7 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心",
}},
"dial": &ctx.Command{Name: "dial args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
if _, ok := m.Target().Server.(*NFS); m.Assert(ok) { //{{{
m.Find("tcp").Call(func(com *ctx.Message) *ctx.Message {
m.Sess("tcp").Call(func(com *ctx.Message) *ctx.Message {
sub := com.Spawn(c)
sub.Start(fmt.Sprintf("file%d", m.Capi("nfile", 1)), "远程文件")
return sub
@ -1338,111 +1293,10 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心",
// }}}
}},
"send": &ctx.Command{Name: "send [file] args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { // {{{
m.Log("fuck", "%v %v", arg, nfs.io)
nfs.io.Write([]byte(arg[0]))
return
if m.Has("nrecv") {
if len(arg) > 1 && arg[0] == "file" {
info, e := os.Stat(arg[1])
m.Assert(e)
m.Append("name", info.Name())
m.Append("size", info.Size())
m.Append("time", info.ModTime())
m.Append("mode", info.Mode())
f, e := os.Open(arg[1])
m.Assert(e)
m.Put("append", "io", f)
}
} else {
nfs.send[m.Optioni("nsend", m.Capi("nsend", 1))] = m
if len(arg) > 1 && arg[0] == "file" {
info, e := os.Stat(arg[1])
m.Assert(e)
m.Option("name", info.Name())
m.Option("size", info.Size())
m.Option("time", info.ModTime())
m.Option("mode", info.Mode())
n, e := fmt.Fprintf(nfs.Writer, "detail: recv\n")
m.Capi("nbytes", n)
m.Assert(e)
}
for _, v := range arg {
n, e := fmt.Fprintf(nfs.Writer, "detail: %v\n", v)
m.Capi("nbytes", n)
m.Assert(e)
}
for _, k := range m.Meta["option"] {
if k == "args" {
continue
}
for _, v := range m.Meta[k] {
n, e := fmt.Fprintf(nfs.Writer, "%s: %s\n", k, v)
m.Capi("nbytes", n)
m.Assert(e)
}
}
m.Log("info", "%d send", m.Optioni("nsend"))
m.Log("info", "detail: %v", m.Meta["detail"])
m.Log("info", "option: %v", m.Meta["option"])
n, e := fmt.Fprintf(nfs.Writer, "\n")
m.Capi("nbytes", n)
m.Assert(e)
nfs.Writer.Flush()
if len(arg) > 1 && arg[0] == "file" {
f, e := os.Open(arg[1])
m.Assert(e)
defer f.Close()
_, e = io.Copy(nfs.Writer, f)
}
}
} // }}}
}},
"recv": &ctx.Command{Name: "recv [file] args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) { // {{{
if m.Has("nrecv") {
if len(arg) > 1 && arg[0] == "file" {
f, e := os.Create(arg[1])
m.Assert(e)
defer f.Close()
io.CopyN(f, nfs.Reader, int64(m.Optioni("size")))
}
return
}
nfs.send[m.Optioni("nrecv", m.Capi("nsend", 1))] = m
if len(arg) > 1 && arg[0] == "file" {
f, e := os.Create(arg[1])
m.Assert(e)
m.Put("option", "io", f)
fmt.Fprintf(nfs.Writer, "detail: send\n")
}
for _, v := range arg {
fmt.Fprintf(nfs.Writer, "detail: %v\n", v)
}
for _, k := range m.Meta["option"] {
if k == "args" {
continue
}
for _, v := range m.Meta[k] {
fmt.Fprintf(nfs.Writer, "%s: %s\n", k, v)
}
}
fmt.Fprintf(nfs.Writer, "\n")
nfs.Writer.Flush()
if nfs, ok := m.Target().Server.(*NFS); m.Assert(ok) && nfs.io != nil { // {{{
m.Remote = make(chan bool, 1)
nfs.send <- m
<-m.Remote
} // }}}
}},
},

View File

@ -70,7 +70,7 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { // {{{
tcp.Conn = c
m.Log("info", "%s accept %s", m.Cap("nclient"),
m.Append("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr()))))
m.Option("stream", m.Cap("stream", fmt.Sprintf("%s<-%s", tcp.LocalAddr(), tcp.RemoteAddr()))))
m.Put("option", "io", tcp.Conn).Back(m)
return false
default: