diff --git a/base/tcp/broad.go b/base/tcp/broad.go new file mode 100644 index 00000000..60ae0cb8 --- /dev/null +++ b/base/tcp/broad.go @@ -0,0 +1,54 @@ +package tcp + +import ( + "net" + + ice "shylinux.com/x/icebergs" + "shylinux.com/x/icebergs/base/mdb" + kit "shylinux.com/x/toolkits" + "shylinux.com/x/toolkits/logs" +) + +func _server_udp(m *ice.Message, arg ...string) { + l, e := net.ListenUDP(UDP4, UDPAddr(m, "0.0.0.0", m.Option(PORT))) + if e == nil { + defer l.Close() + } + mdb.HashCreate(m, arg, kit.Dict(mdb.TARGET, l), STATUS, kit.Select(ERROR, OPEN, e == nil), ERROR, kit.Format(e)) + switch cb := m.OptionCB("").(type) { + case func(*net.UDPAddr, []byte): + m.Assert(e) + buf := make([]byte, 2*ice.MOD_BUFS) + for { + if n, from, e := l.ReadFromUDP(buf[:]); !m.Warn(e) { + cb(from, buf[:n]) + } else { + break + } + } + } +} +func _client_dial_udp4(m *ice.Message, arg ...string) { + c, e := net.DialUDP(UDP4, nil, UDPAddr(m, kit.Select("255.255.255.255", m.Option(HOST)), m.Option(PORT))) + if e == nil { + defer c.Close() + } + switch cb := m.OptionCB("").(type) { + case func(*net.UDPConn): + m.Assert(e) + cb(c) + } +} + +const ( + UDP4 = "udp4" + SEND = "send" + RECV = "recv" +) + +func UDPAddr(m *ice.Message, host, port string) *net.UDPAddr { + if addr, e := net.ResolveUDPAddr(UDP4, host+ice.DF+port); !m.Warn(e, ice.ErrNotValid, host, port, logs.FileLineMeta(2)) { + return addr + } + return nil +} diff --git a/base/tcp/client.go b/base/tcp/client.go index 87d07c41..e49b7a30 100644 --- a/base/tcp/client.go +++ b/base/tcp/client.go @@ -7,16 +7,11 @@ import ( "shylinux.com/x/icebergs/base/mdb" ) -type Stat struct { - nc, nr, nw int -} - type Conn struct { + net.Conn m *ice.Message h string s *Stat - - net.Conn } func (c *Conn) Read(b []byte) (int, error) { @@ -33,15 +28,11 @@ func (c *Conn) Close() error { return c.Conn.Close() } func _client_dial(m *ice.Message, arg ...string) { c, e := net.Dial(TCP, m.Option(HOST)+ice.DF+m.Option(PORT)) - c = &Conn{m: m, s: &Stat{}, Conn: c} + c = &Conn{Conn: c, m: m, s: &Stat{}} if e == nil { defer c.Close() } switch cb := m.OptionCB("").(type) { - case func(*ice.Message, net.Conn): - if !m.Warn(e) { - cb(m, c) - } case func(net.Conn): if !m.Warn(e) { cb(c) @@ -51,15 +42,6 @@ func _client_dial(m *ice.Message, arg ...string) { } } -const ( - PROTO = "proto" - STATUS = "status" - ERROR = "error" - START = "start" - OPEN = "open" - CLOSE = "close" - STOP = "stop" -) const ( DIAL = "dial" ) @@ -69,8 +51,13 @@ func init() { Index.MergeCommands(ice.Commands{ CLIENT: {Name: "client hash auto prunes", Help: "客户端", Actions: ice.MergeActions(ice.Actions{ DIAL: {Name: "dial type name port=9010 host=", Help: "连接", Hand: func(m *ice.Message, arg ...string) { - _client_dial(m, arg...) + switch m.Option(mdb.TYPE) { + case UDP4: + _client_dial_udp4(m, arg...) + default: + _client_dial(m, arg...) + } }}, - }, mdb.StatusHashAction(mdb.FIELD, "time,hash,status,type,name,host,port,error,nread,nwrite"), mdb.ClearOnExitHashAction())}, + }, mdb.StatusHashAction(mdb.FIELD, "time,hash,status,type,name,host,port,error"), mdb.ClearOnExitHashAction())}, }) } diff --git a/base/tcp/peek.go b/base/tcp/peek.go new file mode 100644 index 00000000..3d7a1341 --- /dev/null +++ b/base/tcp/peek.go @@ -0,0 +1,54 @@ +package tcp + +import ( + "bytes" + "net" + "net/http" + "strings" + + kit "shylinux.com/x/toolkits" +) + +type Buf struct { + buf []byte +} +type PeekConn struct { + net.Conn + *Buf +} + +func (s PeekConn) Read(b []byte) (n int, err error) { + if len(s.buf) == 0 { + return s.Conn.Read(b) + } + if len(s.buf) < len(b) { + copy(b, s.buf) + s.buf = s.buf[:0] + return len(s.buf), nil + } + copy(b, s.buf) + s.buf = s.buf[len(b):] + return len(b), nil +} +func (s PeekConn) Peek(n int) (res []byte) { + b := make([]byte, n) + _n, _ := s.Conn.Read(b) + s.Buf.buf = b[:_n] + return b[:_n] +} +func (s PeekConn) IsHTTP() bool { + if bytes.Equal(s.Peek(4), []byte("GET ")) { + return true + } + return false +} +func (s PeekConn) Redirect(status int, location string) { + DF, NL := ": ", "\r\n" + s.Conn.Write([]byte(strings.Join([]string{ + kit.Format("HTTP/1.1 %d %s", status, http.StatusText(status)), kit.JoinKV(DF, NL, + "Location", location, "Content-Length", "0", + )}, NL) + NL + NL)) +} +func NewPeekConn(c net.Conn) PeekConn { + return PeekConn{Conn: c, Buf: &Buf{}} +} diff --git a/base/tcp/server.go b/base/tcp/server.go index c9455f5e..c9b92888 100644 --- a/base/tcp/server.go +++ b/base/tcp/server.go @@ -6,22 +6,15 @@ import ( ice "shylinux.com/x/icebergs" "shylinux.com/x/icebergs/base/mdb" kit "shylinux.com/x/toolkits" - "shylinux.com/x/toolkits/logs" ) -func _broad_addr(m *ice.Message, host, port string) *net.UDPAddr { - if addr, e := net.ResolveUDPAddr("udp4", kit.Format("%s:%s", host, port)); !m.Warn(e, ice.ErrNotValid, host, port, logs.FileLineMeta(2)) { - return addr - } - return nil -} +type Stat struct{ nc, nr, nw int } type Listener struct { + net.Listener m *ice.Message h string s *Stat - - net.Listener } func (l Listener) Accept() (net.Conn, error) { @@ -34,23 +27,9 @@ func (l Listener) Close() error { return l.Listener.Close() } -func _server_udp(m *ice.Message, arg ...string) { - l, e := net.ListenUDP("udp4", _broad_addr(m, "0.0.0.0", m.Option(PORT))) - defer l.Close() - mdb.HashCreate(m, arg, kit.Dict(mdb.TARGET, l), STATUS, kit.Select(ERROR, OPEN, e == nil), ERROR, kit.Format(e)) - switch cb := m.OptionCB("").(type) { - case func(*net.UDPConn): - m.Assert(e) - cb(l) - } -} func _server_listen(m *ice.Message, arg ...string) { - if m.Option("type") == "udp4" { - _server_udp(m, arg...) - return - } - l, e := net.Listen(TCP, m.Option(HOST)+":"+m.Option(PORT)) - l = &Listener{m: m, h: mdb.HashCreate(m, arg, kit.Dict(mdb.TARGET, l), STATUS, kit.Select(ERROR, OPEN, e == nil), ERROR, kit.Format(e)), s: &Stat{}, Listener: l} + l, e := net.Listen(TCP, m.Option(HOST)+ice.DF+m.Option(PORT)) + l = &Listener{Listener: l, m: m, h: mdb.HashCreate(m, arg, kit.Dict(mdb.TARGET, l), STATUS, kit.Select(ERROR, OPEN, e == nil), ERROR, kit.Format(e)), s: &Stat{}} if e == nil { defer l.Close() } @@ -60,7 +39,7 @@ func _server_listen(m *ice.Message, arg ...string) { cb(l) case func(net.Conn): for { - if c, e := l.Accept(); e == nil { + if c, e := l.Accept(); !m.Warn(e) { cb(c) } else { break @@ -77,6 +56,15 @@ const ( HOSTNAME = "hostname" NODENAME = "nodename" ) +const ( + PROTO = "proto" + STATUS = "status" + ERROR = "error" + START = "start" + OPEN = "open" + CLOSE = "close" + STOP = "stop" +) const ( LISTEN = "listen" ) @@ -85,7 +73,14 @@ const SERVER = "server" func init() { Index.MergeCommands(ice.Commands{ SERVER: {Name: "server hash auto prunes", Help: "服务器", Actions: ice.MergeActions(ice.Actions{ - LISTEN: {Name: "listen type name port=9030 host=", Hand: func(m *ice.Message, arg ...string) { _server_listen(m, arg...) }}, - }, mdb.StatusHashAction(mdb.FIELD, "time,hash,status,type,name,host,port,error,nconn"), mdb.ClearOnExitHashAction())}, + LISTEN: {Name: "listen type name port=9030 host=", Hand: func(m *ice.Message, arg ...string) { + switch m.Option(mdb.TYPE) { + case UDP4: + _server_udp(m, arg...) + default: + _server_listen(m, arg...) + } + }}, + }, mdb.StatusHashAction(mdb.FIELD, "time,hash,status,type,name,host,port,error"), mdb.ClearOnExitHashAction())}, }) } diff --git a/base/tcp/tcp.go b/base/tcp/tcp.go index 056611a2..9a909e40 100644 --- a/base/tcp/tcp.go +++ b/base/tcp/tcp.go @@ -1,65 +1,11 @@ package tcp import ( - "bytes" - "net" - "net/http" - "strings" - ice "shylinux.com/x/icebergs" - kit "shylinux.com/x/toolkits" ) -const ( - SEND = "send" - RECV = "recv" -) const TCP = "tcp" var Index = &ice.Context{Name: TCP, Help: "通信模块"} func init() { ice.Index.Register(Index, nil, HOST, PORT, CLIENT, SERVER) } - -type Buf struct { - buf []byte -} -type PeekConn struct { - net.Conn - *Buf -} - -func (s PeekConn) Read(b []byte) (n int, err error) { - if len(s.buf) == 0 { - return s.Conn.Read(b) - } - if len(s.buf) < len(b) { - copy(b, s.buf) - s.buf = s.buf[:0] - return len(s.buf), nil - } - copy(b, s.buf) - s.buf = s.buf[len(b):] - return len(b), nil -} -func (s PeekConn) Peek(n int) (res []byte) { - b := make([]byte, n) - _n, _ := s.Conn.Read(b) - s.Buf.buf = b[:_n] - return b[:_n] -} -func (s PeekConn) IsHTTP() bool { - if bytes.Equal(s.Peek(4), []byte("GET ")) { - return true - } - return false -} -func (s PeekConn) Redirect(status int, location string) { - DF, NL := ": ", "\r\n" - s.Conn.Write([]byte(strings.Join([]string{ - kit.Format("HTTP/1.1 %d %s", status, http.StatusText(status)), kit.JoinKV(DF, NL, - "Location", location, "Content-Length", "0", - )}, NL) + NL + NL)) -} -func NewPeekConn(c net.Conn) PeekConn { - return PeekConn{Conn: c, Buf: &Buf{}} -} diff --git a/base/web/broad.go b/base/web/broad.go index 08d3f852..30ccd10d 100644 --- a/base/web/broad.go +++ b/base/web/broad.go @@ -2,70 +2,36 @@ package web import ( "net" - "strings" ice "shylinux.com/x/icebergs" "shylinux.com/x/icebergs/base/aaa" "shylinux.com/x/icebergs/base/ctx" + "shylinux.com/x/icebergs/base/gdb" "shylinux.com/x/icebergs/base/mdb" "shylinux.com/x/icebergs/base/nfs" "shylinux.com/x/icebergs/base/tcp" kit "shylinux.com/x/toolkits" - "shylinux.com/x/toolkits/logs" ) -func _broad_addr(m *ice.Message, host, port string) *net.UDPAddr { - if addr, e := net.ResolveUDPAddr("udp4", kit.Format("%s:%s", host, port)); !m.Warn(e, ice.ErrNotValid, host, port, logs.FileLineMeta(2)) { - return addr - } - return nil -} -func _broad_send(m *ice.Message, host, port string, remote_host, remote_port string, arg ...string) { - if s, e := net.DialUDP("udp4", nil, _broad_addr(m, remote_host, remote_port)); !m.Warn(e, ice.ErrNotValid) { - defer s.Close() - msg := m.Spawn(kit.Dict(tcp.HOST, host, tcp.PORT, port), kit.Dict(arg)) - m.Logs(tcp.SEND, BROAD, msg.FormatMeta(), nfs.TO, remote_host+ice.DF+remote_port) - s.Write([]byte(msg.FormatMeta())) - } -} -func _broad_serve(m *ice.Message, port string) { - m.Cmd(tcp.HOST, func(value ice.Maps) { - _broad_send(m, value[aaa.IP], port, "255.255.255.255", "9020", mdb.TYPE, ice.Info.NodeType, mdb.NAME, ice.Info.NodeName) +func _broad_send(m *ice.Message, remote_host, remote_port string, host, port string, arg ...string) { + m.Cmd(tcp.CLIENT, tcp.DIAL, mdb.TYPE, tcp.UDP4, tcp.HOST, remote_host, tcp.PORT, kit.Select("9020", remote_port), func(s *net.UDPConn) { + msg := m.Spawn(kit.Dict(tcp.HOST, host, tcp.PORT, port, arg)) + msg.Logs(tcp.SEND, BROAD, msg.FormatsMeta(nil), nfs.TO, remote_host+ice.DF+remote_port).FormatsMeta(s) }) - m.Cmd(tcp.SERVER, tcp.LISTEN, mdb.TYPE, "udp4", m.OptionSimple(mdb.NAME, tcp.HOST, tcp.PORT), func(l *net.UDPConn) { - buf := make([]byte, ice.MOD_BUFS) - for { - n, from, e := l.ReadFromUDP(buf[:]) - if e != nil { - break - } - m.Logs(tcp.RECV, BROAD, string(buf[:n]), nfs.FROM, from) - msg := m.Spawn(buf[:n]) - if msg.Option(mdb.ZONE) == "echo" { - _broad_save(m, msg) - continue - } - if remote := _broad_addr(m, msg.Option(tcp.HOST), msg.Option(tcp.PORT)); remote != nil { - m.Cmd(BROAD, func(value ice.Maps) { - m.Logs(tcp.SEND, BROAD, kit.Format(value), nfs.TO, kit.Format(remote)) - l.WriteToUDP([]byte(m.Spawn(value, kit.Dict(mdb.ZONE, "echo")).FormatMeta()), remote) - }) - _broad_save(m, msg) - } +} +func _broad_serve(m *ice.Message) { + m.GoSleep("10ms", tcp.HOST, func(value ice.Maps) { + _broad_send(m, "", "", value[aaa.IP], m.Option(tcp.PORT), gdb.EVENT, tcp.LISTEN, mdb.NAME, ice.Info.NodeName, mdb.TYPE, ice.Info.NodeType) + }) + m.Cmd(tcp.SERVER, tcp.LISTEN, mdb.TYPE, tcp.UDP4, m.OptionSimple(mdb.NAME, tcp.HOST, tcp.PORT), func(from *net.UDPAddr, buf []byte) { + msg := m.Spawn(buf).Logs(tcp.RECV, BROAD, string(buf), nfs.FROM, from) + if mdb.HashCreate(m, msg.OptionSimple(kit.Simple(msg.Optionv(ice.MSG_OPTION))...)); msg.Option(gdb.EVENT) == tcp.LISTEN { + m.Cmds("", func(value ice.Maps) { + _broad_send(m, msg.Option(tcp.HOST), msg.Option(tcp.PORT), value[tcp.HOST], value[tcp.PORT], mdb.TYPE, value[mdb.TYPE], mdb.NAME, value[mdb.NAME]) + }) } }) } -func _broad_save(m, msg *ice.Message) { - save := false - m.Cmd(tcp.HOST, func(value ice.Maps) { - if strings.Split(msg.Option(tcp.HOST), ice.PT)[0] == strings.Split(value[aaa.IP], ice.PT)[0] { - save = true - } - }) - if save { - mdb.HashCreate(m, msg.OptionSimple(kit.Simple(msg.Optionv(ice.MSG_OPTION))...)) - } -} const BROAD = "broad" @@ -76,10 +42,7 @@ func init() { if arg[0] == mdb.FOREACH && arg[1] == "" { host, domain := m.Cmd(tcp.HOST).Append(aaa.IP), OptionUserWeb(m).Hostname() m.Cmds("", func(value ice.Maps) { - if value[tcp.HOST] == host { - value[tcp.HOST] = domain - } - switch value[mdb.TYPE] { + switch kit.If(value[tcp.HOST] == host, func() { value[tcp.HOST] = domain }); value[mdb.TYPE] { case "sshd": m.PushSearch(mdb.NAME, ice.Render(m, ice.RENDER_SCRIPT, kit.Format("ssh -p %s %s@%s", value[tcp.PORT], m.Option(ice.MSG_USERNAME), value[tcp.HOST])), mdb.TEXT, kit.Format("http://%s:%s", value[tcp.HOST], value[tcp.PORT]), value) @@ -89,18 +52,12 @@ func init() { }) } }}, - SERVE: {Name: "serve port=9020", Hand: func(m *ice.Message, arg ...string) { - _broad_serve(m, m.Option(tcp.PORT)) - }}, + SERVE_START: {Hand: func(m *ice.Message, arg ...string) { m.Go(func() { m.Cmd("", SERVE, m.OptionSimple(tcp.PORT)) }) }}, + SERVE: {Name: "serve port=9020", Hand: func(m *ice.Message, arg ...string) { _broad_serve(m) }}, OPEN: {Hand: func(m *ice.Message, arg ...string) { ctx.ProcessOpen(m, kit.Format("http://%s:%s", m.Option(tcp.HOST), m.Option(tcp.PORT))) }}, - tcp.SEND: {Hand: func(m *ice.Message, arg ...string) { - _broad_send(m, "", "", "255.255.255.255", "9020", arg...) - }}, - SERVE_START: {Hand: func(m *ice.Message, arg ...string) { - m.Go(func() { m.Cmd(BROAD, SERVE, m.OptionSimple(tcp.PORT)) }) - }}, + tcp.SEND: {Hand: func(m *ice.Message, arg ...string) { _broad_send(m, "", "", "", "", arg...) }}, }, mdb.HashAction(mdb.SHORT, "host,port", mdb.FIELD, "time,hash,type,name,host,port", mdb.ACTION, OPEN), mdb.ClearOnExitHashAction())}, }) } diff --git a/base/web/serve.go b/base/web/serve.go index b6817741..3818c499 100644 --- a/base/web/serve.go +++ b/base/web/serve.go @@ -171,6 +171,7 @@ func init() { SERVE_START: {Hand: func(m *ice.Message, arg ...string) { m.Go(func() { ssh.PrintQRCode(m, tcp.PublishLocalhost(m, _serve_address(m))) + return opened := false for i := 0; i < 3 && !opened; i++ { m.Sleep("1s").Cmd(SPACE, func(value ice.Maps) { kit.If(value[mdb.TYPE] == CHROME, func() { opened = true }) }) diff --git a/base/web/space.go b/base/web/space.go index 2257a5ef..690a7dc7 100644 --- a/base/web/space.go +++ b/base/web/space.go @@ -86,17 +86,17 @@ func _space_handle(m *ice.Message, safe bool, name string, c *websocket.Conn) { } else { m.Warn(!mdb.HashSelectDetail(m, next, func(value ice.Map) { switch c := value[mdb.TARGET].(type) { - case (*websocket.Conn): - _space_echo(msg, source, target, c) // 转发报文 - case ice.Handler: - c(msg) // 接收响应 + case (*websocket.Conn): // 转发报文 + _space_echo(msg, source, target, c) + case ice.Handler: // 接收响应 + c(msg) } }), ice.ErrNotFound, next) } } } func _space_domain(m *ice.Message) (link string) { - m.Options(ice.MSG_OPTION, ice.MSG_USERNAME, ice.MSG_OPTS, ice.MSG_USERNAME) + // m.Options(ice.MSG_OPTION, ice.MSG_USERNAME, ice.MSG_OPTS, ice.MSG_USERNAME) return kit.GetValid( func() string { return ice.Info.Domain }, func() string { @@ -197,7 +197,7 @@ func init() { ice.PS: {Hand: func(m *ice.Message, arg ...string) { _space_fork(m) }}, }, mdb.HashAction(mdb.SHORT, mdb.NAME, mdb.FIELD, "time,type,name,text", ctx.ACTION, OPEN, REDIAL, kit.Dict("a", 3000, "b", 1000, "c", 1000), - ), mdb.ClearOnExitHashAction(), aaa.WhiteAction()), Hand: func(m *ice.Message, arg ...string) { + ), mdb.ClearOnExitHashAction()), Hand: func(m *ice.Message, arg ...string) { if len(arg) < 2 { mdb.HashSelect(m, arg...).Sort("").Table(func(value ice.Maps) { m.PushButton(kit.Select(OPEN, LOGIN, value[mdb.TYPE] == LOGIN), mdb.REMOVE) diff --git a/base/web/web.go b/base/web/web.go index 74173052..5e468102 100644 --- a/base/web/web.go +++ b/base/web/web.go @@ -54,7 +54,7 @@ func (f *Frame) Start(m *ice.Message, arg ...string) { case func(http.Handler): cb(f) default: - m.Cmd(tcp.SERVER, tcp.LISTEN, mdb.TYPE, WEB, m.OptionSimple(mdb.NAME, tcp.HOST, tcp.PORT), func(l net.Listener) { + m.Cmd(tcp.SERVER, tcp.LISTEN, mdb.TYPE, HTTP, m.OptionSimple(mdb.NAME, tcp.HOST, tcp.PORT), func(l net.Listener) { defer mdb.HashCreateDeferRemove(m, m.OptionSimple(mdb.NAME, tcp.PROTO), arg, cli.STATUS, tcp.START)() gdb.EventDeferEvent(m, SERVE_START, arg) m.Warn(f.Server.Serve(l)) diff --git a/exec.go b/exec.go index 48b6bc40..7d20f1d9 100644 --- a/exec.go +++ b/exec.go @@ -54,6 +54,7 @@ func (m *Message) Sleep(d Any, arg ...Any) *Message { func (m *Message) Sleep300ms(arg ...Any) *Message { return m.Sleep("300ms", arg...) } func (m *Message) Sleep30ms(arg ...Any) *Message { return m.Sleep("30ms", arg...) } func (m *Message) Sleep3s(arg ...Any) *Message { return m.Sleep("3s", arg...) } +func (m *Message) GoSleep(t string, arg ...Any) { m.Go(func() { m.Sleep(t).Cmd(arg...) }) } func (m *Message) Go(cb func(), arg ...Any) { kit.If(len(arg) == 0, func() { arg = append(arg, logs.FileLine(cb)) }) task.Put(arg[0], func(task *task.Task) { m.TryCatch(m, true, func(m *Message) { cb() }) }) diff --git a/logs.go b/logs.go index 23c098e6..b4dc8194 100644 --- a/logs.go +++ b/logs.go @@ -161,7 +161,7 @@ func (m *Message) FormatCost() string { return kit.FmtDuration(time.Since(m.time func (m *Message) FormatMeta() string { return kit.Format(m.meta) } func (m *Message) FormatsMeta(w io.Writer, arg ...string) (res string) { if w == nil { - buf := bytes.NewBuffer(make([]byte, MOD_BUFS)) + buf := bytes.NewBuffer(make([]byte, 0, MOD_BUFS)) defer func() { res = buf.String() }() w = buf } diff --git a/type.go b/type.go index 16b1a865..4e80b906 100644 --- a/type.go +++ b/type.go @@ -249,7 +249,7 @@ func (m *Message) Spawn(arg ...Any) *Message { for _, val := range arg { switch val := val.(type) { case []byte: - json.Unmarshal(val, &msg.meta) + m.Warn(json.Unmarshal(val, &msg.meta)) case Option: msg.Option(val.Name, val.Value) case Maps: