1
0
forked from x/ContextOS

vps add ssh 重新定义集群模块

This commit is contained in:
shaoying 2018-04-17 09:02:20 +08:00
parent 0c069d3e97
commit a27b443df3
3 changed files with 50 additions and 18 deletions

View File

@ -821,7 +821,7 @@ func (m *Message) Echo(str string, arg ...interface{}) *Message { // {{{
func (m *Message) Copy(msg *Message, meta string, arg ...string) *Message { // {{{ func (m *Message) Copy(msg *Message, meta string, arg ...string) *Message { // {{{
switch meta { switch meta {
case "detail", "result": case "detail", "result":
m.Meta[meta] = append(m.Meta[meta][:0], msg.Meta[meta]...) m.Set(meta, msg.Meta[meta]...)
case "option", "append": case "option", "append":
if len(arg) == 0 { if len(arg) == 0 {
arg = msg.Meta[meta] arg = msg.Meta[meta]
@ -1145,6 +1145,7 @@ func (m *Message) Back(ok bool) *Message { // {{{
return m return m
} }
m.Log("info", nil, "back %v %v", m.Meta["result"], m.Meta["append"])
done, up := m.callback(ok) done, up := m.callback(ok)
if done { if done {
m.callback = nil m.callback = nil

View File

@ -308,29 +308,30 @@ var Index = &ctx.Context{Name: "nfs", Help: "存储中心",
}}, }},
"listen": &ctx.Command{Name: "listen args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { "listen": &ctx.Command{Name: "listen args...", Help: "启动文件服务, args: 参考tcp模块, listen命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
msg := m.Sess("tcp", "tcp") // {{{ msg := m.Sess("pub", "tcp") // {{{
msg.Call(func(ok bool) (done bool, up bool) { msg.Call(func(ok bool) (done bool, up bool) {
if ok { if ok {
sub := msg.Spawn(m.Target()) sub := msg.Spawn(m.Target())
sub.Put("option", "io", msg.Data["io"]) sub.Put("option", "io", msg.Data["io"])
sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...) sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...)
sub.Cap("stream", msg.Append("stream")) sub.Cap("stream", msg.Target().Name)
sub.Echo(sub.Target().Name) sub.Echo(sub.Target().Name)
m.Target(sub.Target())
} }
return false, true return false, true
}, false) }, false).Cmd(m.Meta["detail"])
m.Cap("stream", msg.Cmd(m.Meta["detail"]).Cap("address"))
// }}} // }}}
}}, }},
"dial": &ctx.Command{Name: "dial args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { "dial": &ctx.Command{Name: "dial args...", Help: "连接文件服务, args: 参考tcp模块, dial命令的参数", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
msg := m.Sess("tcp", "tcp") // {{{ msg := m.Sess("com", "tcp") // {{{
msg.Call(func(ok bool) (done bool, up bool) { msg.Call(func(ok bool) (done bool, up bool) {
if ok { if ok {
sub := msg.Spawn(m.Target()) sub := msg.Spawn(m.Target())
sub.Put("option", "io", msg.Data["io"]) sub.Put("option", "io", msg.Data["io"])
sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...) sub.Start(fmt.Sprintf("file%d", Pulse.Capi("nfile", 1)), "打开文件", sub.Meta["detail"]...)
sub.Cap("stream", msg.Append("stream")) sub.Cap("stream", msg.Target().Name)
sub.Echo(sub.Target().Name) sub.Echo(sub.Target().Name)
m.Target(sub.Target())
return true, true return true, true
} }
return false, false return false, false

View File

@ -1,6 +1,6 @@
package ssh package ssh // {{{
// }}}
import ( import ( // {{{
"bufio" "bufio"
"contexts" "contexts"
"fmt" "fmt"
@ -9,6 +9,8 @@ import (
"strings" "strings"
) )
// }}}
type SSH struct { type SSH struct {
send map[string]*ctx.Message send map[string]*ctx.Message
*bufio.Writer *bufio.Writer
@ -18,7 +20,7 @@ type SSH struct {
*ctx.Context *ctx.Context
} }
func (ssh *SSH) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server { func (ssh *SSH) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server { // {{{
c.Caches = map[string]*ctx.Cache{ c.Caches = map[string]*ctx.Cache{
"nsend": &ctx.Cache{Name: "消息发送数量", Value: "0", Help: "消息发送数量"}, "nsend": &ctx.Cache{Name: "消息发送数量", Value: "0", Help: "消息发送数量"},
"nrecv": &ctx.Cache{Name: "消息接收数量", Value: "0", Help: "消息接收数量"}, "nrecv": &ctx.Cache{Name: "消息接收数量", Value: "0", Help: "消息接收数量"},
@ -33,14 +35,18 @@ func (ssh *SSH) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server
return s return s
} }
func (ssh *SSH) Begin(m *ctx.Message, arg ...string) ctx.Server { // }}}
func (ssh *SSH) Begin(m *ctx.Message, arg ...string) ctx.Server { // {{{
if ssh.Context == Index { if ssh.Context == Index {
Pulse = m Pulse = m
} }
return ssh return ssh
} }
func (ssh *SSH) Start(m *ctx.Message, arg ...string) bool { // }}}
func (ssh *SSH) Start(m *ctx.Message, arg ...string) bool { // {{{
return false
ssh.Group = "" ssh.Group = ""
ssh.Owner = nil ssh.Owner = nil
ssh.Conn = m.Data["io"].(net.Conn) ssh.Conn = m.Data["io"].(net.Conn)
@ -97,7 +103,8 @@ func (ssh *SSH) Start(m *ctx.Message, arg ...string) bool {
return false return false
} }
func (ssh *SSH) Close(m *ctx.Message, arg ...string) bool { // }}}
func (ssh *SSH) Close(m *ctx.Message, arg ...string) bool { // {{{
switch ssh.Context { switch ssh.Context {
case m.Target(): case m.Target():
case m.Source(): case m.Source():
@ -105,6 +112,8 @@ func (ssh *SSH) Close(m *ctx.Message, arg ...string) bool {
return true return true
} }
// }}}
var Pulse *ctx.Message var Pulse *ctx.Message
var Index = &ctx.Context{Name: "ssh", Help: "集群中心", var Index = &ctx.Context{Name: "ssh", Help: "集群中心",
Caches: map[string]*ctx.Cache{ Caches: map[string]*ctx.Cache{
@ -113,13 +122,34 @@ var Index = &ctx.Context{Name: "ssh", Help: "集群中心",
Configs: map[string]*ctx.Config{}, Configs: map[string]*ctx.Config{},
Commands: map[string]*ctx.Command{ Commands: map[string]*ctx.Command{
"listen": &ctx.Command{Name: "listen address protocol", Help: "监听连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { "listen": &ctx.Command{Name: "listen address protocol", Help: "监听连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
m.Find("tcp").Cmd(m.Meta["detail"]) msg := m.Sess("file", "nfs")
msg.Call(func(ok bool) (done bool, up bool) {
if ok {
sub := msg.Spawn(m.Target())
sub.Start(fmt.Sprintf("host%d", Pulse.Capi("nhost", 1)), "打开文件", sub.Meta["detail"]...)
sub.Cap("stream", msg.Target().Name)
sub.Target().Sessions["file"] = msg
sub.Echo(sub.Target().Name)
}
return false, true
}, false).Cmd(m.Meta["detail"])
}}, }},
"dial": &ctx.Command{Name: "dial address protocol", Help: "建立连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { "dial": &ctx.Command{Name: "dial address protocol", Help: "建立连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
m.Find("tcp").Cmd(m.Meta["detail"]) msg := m.Sess("file", "nfs")
msg.Call(func(ok bool) (done bool, up bool) {
if ok {
m.Cap("stream", msg.Target().Name)
return true, true
}
return false, false
}, false).Cmd(m.Meta["detail"])
}}, }},
"open": &ctx.Command{Name: "open", Help: "打开连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { "send": &ctx.Command{Name: "send arg...", Help: "打开连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
m.Start(fmt.Sprintf("host%d", Pulse.Capi("nhost", 1)), "主机连接") msg := m.Sess("file")
msg.Copy(m, "detail").Cmd()
m.Copy(msg, "result")
}}, }},
"remote": &ctx.Command{Name: "remote detail...", Help: "远程执行", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) { "remote": &ctx.Command{Name: "remote detail...", Help: "远程执行", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) {
ssh, ok := m.Target().Server.(*SSH) ssh, ok := m.Target().Server.(*SSH)