From 9f81fa985d10a986c3b766c4634a2edac0951a46 Mon Sep 17 00:00:00 2001 From: shaoying Date: Sat, 20 Jul 2019 16:15:45 +0800 Subject: [PATCH] opt tcp.go --- src/contexts/tcp/tcp.go | 184 ++++++++++++++++++++-------------------- 1 file changed, 92 insertions(+), 92 deletions(-) diff --git a/src/contexts/tcp/tcp.go b/src/contexts/tcp/tcp.go index cb0d09b1..6514d236 100644 --- a/src/contexts/tcp/tcp.go +++ b/src/contexts/tcp/tcp.go @@ -8,6 +8,7 @@ import ( "net" "strconv" "strings" + "sync/atomic" "time" "toolkit" ) @@ -18,54 +19,69 @@ type TCP struct { *ctx.Context } -func (tcp *TCP) Fuck(address []string, action func(address string) (net.Conn, error)) { - m := tcp.Message() +func (tcp *TCP) parse(m *ctx.Message, arg ...string) ([]string, []string) { + address := []string{} + if arg[1] == "dev" { + m.Cmd("web.get", arg[1], arg[2], "temp", "ports", "format", "object").Table(func(line map[string]string) { + address = append(address, line["value"]) + }) + m.Assert(len(address) > 0, "dial failure") + + for i := 2; i < len(arg)-1; i++ { + arg[i] = arg[i+1] + } + if len(arg) > 2 { + arg = arg[:len(arg)-1] + } + } else { + address = append(address, m.Cap("address", m.Confx("address", arg, 1))) + } + return address, arg +} +func (tcp *TCP) retry(m *ctx.Message, address []string, action func(address string) (net.Conn, error)) net.Conn { + var count int32 + cs := make(chan net.Conn) - fuck := make(chan bool, 3) for i := 0; i < m.Confi("retry", "counts"); i++ { for _, p := range address { - m.Cap("address", p) - m.Gos(m, func(m *ctx.Message) { - p := m.Cap("address") - if c, e := action(p); e == nil { - tcp.Conn = c - fuck <- true + m.Gos(m.Spawn().Add("option", "address", p), func(msg *ctx.Message) { + m.Log("info", "dial: %v", msg.Option("address")) + if count >= 1 { + msg.Log("info", "skip: %v", msg.Option("address")) + } else if c, e := action(msg.Option("address")); e != nil { + msg.Log("info", "%s", e) + } else if atomic.AddInt32(&count, 1) > 1 { + msg.Log("info", "close: %s", c.LocalAddr()) + c.Close() } else { - m.Log("info", "dial %s:%s %s", m.Cap("protocol"), p, e) - fuck <- false + cs <- c } }) + } - select { - case ok := <-fuck: - if ok { - return - } - case <-time.After(kit.Duration(m.Conf("retry", "interval"))): - m.Log("info", "dial %s:%s timeout", m.Cap("protocol"), p) - } - time.Sleep(kit.Duration(m.Conf("retry", "interval"))) + select { + case c := <-cs: + return c + + case <-time.After(kit.Duration(m.Conf("retry", "interval"))): + m.Log("info", "dial %s:%v timeout", m.Cap("protocol"), address) } } + return nil } - func (tcp *TCP) Read(b []byte) (n int, e error) { - m := tcp.Context.Message() - m.Assert(tcp.Conn != nil) - n, e = tcp.Conn.Read(b) - m.Capi("nrecv", n) - if e != io.EOF { - m.Assert(e) + if m := tcp.Context.Message(); m.Assert(tcp.Conn != nil) { + if n, e = tcp.Conn.Read(b); e == io.EOF || m.Assert(e) { + m.Capi("nrecv", n) + } } return } func (tcp *TCP) Write(b []byte) (n int, e error) { - m := tcp.Context.Message() - m.Assert(tcp.Conn != nil) - n, e = tcp.Conn.Write(b) - m.Capi("nsend", n) - if e != io.EOF { - m.Assert(e) + if m := tcp.Context.Message(); m.Assert(tcp.Conn != nil) { + if n, e = tcp.Conn.Write(b); e == io.EOF || m.Assert(e) { + m.Capi("nsend", n) + } } return } @@ -78,35 +94,13 @@ func (tcp *TCP) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server "nrecv": &ctx.Cache{Name: "nrecv", Value: "0", Help: "接收字节数"}, "nsend": &ctx.Cache{Name: "nsend", Value: "0", Help: "发送字节数"}, } - c.Configs = map[string]*ctx.Config{} - - s := new(TCP) - s.Context = c - return s + return &TCP{Context: c} } func (tcp *TCP) Begin(m *ctx.Message, arg ...string) ctx.Server { return tcp } func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { - address := []string{} - if arg[1] == "dev" { - m.Cmd("web.get", arg[1], arg[2], "temp", "ports", "format", "object").Table(func(line map[string]string) { - address = append(address, line["value"]) - }) - if len(address) == 0 { - m.Log("warn", "dial failure %v", arg) - return true - } - for i := 2; i < len(arg)-1; i++ { - arg[i] = arg[i+1] - } - if len(arg) > 2 { - arg = arg[:len(arg)-1] - } - } else { - address = append(address, m.Cap("address", m.Confx("address", arg, 1))) - } - + address, arg := tcp.parse(m, arg...) m.Cap("security", m.Confx("security", arg, 2)) m.Cap("protocol", m.Confx("protocol", arg, 3)) @@ -117,17 +111,17 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { m.Assert(e) conf := &tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true} - tcp.Fuck(address, func(p string) (net.Conn, error) { + tcp.Conn = tcp.retry(m, address, func(p string) (net.Conn, error) { return tls.Dial(m.Cap("protocol"), p, conf) }) } else { - tcp.Fuck(address, func(p string) (net.Conn, error) { - return net.Dial(m.Cap("protocol"), p) + tcp.Conn = tcp.retry(m, address, func(p string) (net.Conn, error) { + return net.DialTimeout(m.Cap("protocol"), p, kit.Duration(m.Conf("retry", "timeout"))) }) } - m.Log("info", "%s dial %s", m.Cap("nclient"), - m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr()))) + m.Log("info", "%s connect %s", m.Cap("nclient"), + m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), m.Cap("address", tcp.RemoteAddr().String())))) m.Sess("tcp", m) m.Option("ms_source", tcp.Context.Name) @@ -163,16 +157,13 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { tcp.Listener = l } - m.Log("info", "%d listen %v", m.Capi("nlisten"), - m.Cap("stream", fmt.Sprintf("%s", tcp.Addr()))) + m.Log("info", "%d listen %v", m.Capi("nlisten"), m.Cap("stream", fmt.Sprintf("%s", tcp.Addr()))) addr := strings.Split(tcp.Addr().String(), ":") - ports := []interface{}{} - m.Cmd("tcp.ifconfig").Table(func(line map[string]string) { + if m.Cmd("tcp.ifconfig").Table(func(line map[string]string) { ports = append(ports, fmt.Sprintf("%s:%s", line["ip"], addr[len(addr)-1])) - }) - if len(ports) == 0 { + }); len(ports) == 0 { ports = append(ports, fmt.Sprintf("%s:%s", "127.0.0.1", addr[len(addr)-1])) } m.Back(m.Spawn(m.Source()).Put("option", "node.port", ports)) @@ -181,11 +172,11 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool { } for { - c, e := tcp.Accept() - m.Assert(e) - m.Spawn(Index).Put("option", "io", c).Call(func(sub *ctx.Message) *ctx.Message { - return sub.Spawn(m.Source()) - }, "accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol")) + if c, e := tcp.Accept(); m.Assert(e) { + m.Spawn(Index).Put("option", "io", c).Call(func(sub *ctx.Message) *ctx.Message { + return sub.Spawn(m.Source()) + }, "accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol")) + } } return true @@ -214,13 +205,12 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心", "nclient": &ctx.Cache{Name: "nclient", Value: "0", Help: "连接数量"}, }, Configs: map[string]*ctx.Config{ - "": &ctx.Config{Name: "address", Value: ":9090", Help: "网络地址"}, "address": &ctx.Config{Name: "address", Value: ":9090", Help: "网络地址"}, "security": &ctx.Config{Name: "security(true/false)", Value: "false", Help: "加密通信"}, "protocol": &ctx.Config{Name: "protocol(tcp/tcp4/tcp6)", Value: "tcp4", Help: "网络协议"}, "retry": &ctx.Config{Name: "retry", Value: map[string]interface{}{ - "interval": "3s", "counts": 3, - }, Help: "网络协议"}, + "interval": "3s", "counts": 3, "timeout": "10s", + }, Help: "网络重试"}, }, Commands: map[string]*ctx.Command{ "listen": &ctx.Command{Name: "listen address [security [protocol]]", Help: "网络监听", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { @@ -228,30 +218,31 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心", return }}, "accept": &ctx.Command{Name: "accept address [security [protocol]]", Help: "网络连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { - m.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), "网络连接", m.Meta["detail"]...) + m.Start(fmt.Sprintf("sub%d", m.Capi("nclient", 1)), "网络连接", m.Meta["detail"]...) return }}, "dial": &ctx.Command{Name: "dial address [security [protocol]]", Help: "网络连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { m.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), "网络连接", m.Meta["detail"]...) return }}, + "send": &ctx.Command{Name: "send message", Help: "发送消息", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if tcp, ok := m.Target().Server.(*TCP); m.Assert(ok) { - tcp.Write([]byte(arg[0])) + fmt.Fprint(tcp, arg[0]) } return }}, "recv": &ctx.Command{Name: "recv size", Help: "接收消息", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if tcp, ok := m.Target().Server.(*TCP); m.Assert(ok) { - n, e := strconv.Atoi(arg[0]) - m.Assert(e) - buf := make([]byte, n) - - n, _ = tcp.Read(buf) - m.Echo(string(buf[:n])) + if n, e := strconv.Atoi(arg[0]); m.Assert(e) { + buf := make([]byte, n) + n, _ = tcp.Read(buf) + m.Echo(string(buf[:n])) + } } return }}, + "ifconfig": &ctx.Command{Name: "ifconfig [name]", Help: "网络配置", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { if ifs, e := net.Interfaces(); m.Assert(e) { for _, v := range ifs { @@ -268,11 +259,11 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心", continue } - m.Add("append", "index", v.Index) - m.Add("append", "name", v.Name) - m.Add("append", "ip", ip[0]) - m.Add("append", "mask", ip[1]) - m.Add("append", "hard", v.HardwareAddr.String()) + m.Push("index", v.Index) + m.Push("name", v.Name) + m.Push("ip", ip[0]) + m.Push("mask", ip[1]) + m.Push("hard", v.HardwareAddr.String()) } } } @@ -281,6 +272,17 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心", return }}, "probe": &ctx.Command{Name: "probe [port]", Help: "端口检测", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) { + if len(arg) == 0 { + for i := 0; i < 1024; i++ { + m.Show("port: %v", i) + if t, e := net.DialTimeout("tcp", fmt.Sprintf(":%d", i), 3*time.Second); e == nil { + m.Push("port", i) + t.Close() + } + } + m.Table() + return + } if t, e := net.DialTimeout("tcp", arg[0], 10*time.Second); e == nil { m.Echo("active") t.Close() @@ -291,7 +293,5 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心", } func init() { - tcp := &TCP{} - tcp.Context = Index - ctx.Index.Register(Index, tcp) + ctx.Index.Register(Index, &TCP{Context: Index}) }