mirror of
https://shylinux.com/x/ContextOS
synced 2025-06-26 18:07:30 +08:00
opt tcp.go
This commit is contained in:
parent
26c006a88a
commit
9f81fa985d
@ -8,6 +8,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
"toolkit"
|
"toolkit"
|
||||||
)
|
)
|
||||||
@ -18,54 +19,69 @@ type TCP struct {
|
|||||||
*ctx.Context
|
*ctx.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tcp *TCP) Fuck(address []string, action func(address string) (net.Conn, error)) {
|
func (tcp *TCP) parse(m *ctx.Message, arg ...string) ([]string, []string) {
|
||||||
m := tcp.Message()
|
address := []string{}
|
||||||
|
if arg[1] == "dev" {
|
||||||
|
m.Cmd("web.get", arg[1], arg[2], "temp", "ports", "format", "object").Table(func(line map[string]string) {
|
||||||
|
address = append(address, line["value"])
|
||||||
|
})
|
||||||
|
m.Assert(len(address) > 0, "dial failure")
|
||||||
|
|
||||||
|
for i := 2; i < len(arg)-1; i++ {
|
||||||
|
arg[i] = arg[i+1]
|
||||||
|
}
|
||||||
|
if len(arg) > 2 {
|
||||||
|
arg = arg[:len(arg)-1]
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
address = append(address, m.Cap("address", m.Confx("address", arg, 1)))
|
||||||
|
}
|
||||||
|
return address, arg
|
||||||
|
}
|
||||||
|
func (tcp *TCP) retry(m *ctx.Message, address []string, action func(address string) (net.Conn, error)) net.Conn {
|
||||||
|
var count int32
|
||||||
|
cs := make(chan net.Conn)
|
||||||
|
|
||||||
fuck := make(chan bool, 3)
|
|
||||||
for i := 0; i < m.Confi("retry", "counts"); i++ {
|
for i := 0; i < m.Confi("retry", "counts"); i++ {
|
||||||
for _, p := range address {
|
for _, p := range address {
|
||||||
m.Cap("address", p)
|
m.Gos(m.Spawn().Add("option", "address", p), func(msg *ctx.Message) {
|
||||||
m.Gos(m, func(m *ctx.Message) {
|
m.Log("info", "dial: %v", msg.Option("address"))
|
||||||
p := m.Cap("address")
|
if count >= 1 {
|
||||||
if c, e := action(p); e == nil {
|
msg.Log("info", "skip: %v", msg.Option("address"))
|
||||||
tcp.Conn = c
|
} else if c, e := action(msg.Option("address")); e != nil {
|
||||||
fuck <- true
|
msg.Log("info", "%s", e)
|
||||||
|
} else if atomic.AddInt32(&count, 1) > 1 {
|
||||||
|
msg.Log("info", "close: %s", c.LocalAddr())
|
||||||
|
c.Close()
|
||||||
} else {
|
} else {
|
||||||
m.Log("info", "dial %s:%s %s", m.Cap("protocol"), p, e)
|
cs <- c
|
||||||
fuck <- false
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case ok := <-fuck:
|
case c := <-cs:
|
||||||
if ok {
|
return c
|
||||||
return
|
|
||||||
}
|
case <-time.After(kit.Duration(m.Conf("retry", "interval"))):
|
||||||
case <-time.After(kit.Duration(m.Conf("retry", "interval"))):
|
m.Log("info", "dial %s:%v timeout", m.Cap("protocol"), address)
|
||||||
m.Log("info", "dial %s:%s timeout", m.Cap("protocol"), p)
|
|
||||||
}
|
|
||||||
time.Sleep(kit.Duration(m.Conf("retry", "interval")))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tcp *TCP) Read(b []byte) (n int, e error) {
|
func (tcp *TCP) Read(b []byte) (n int, e error) {
|
||||||
m := tcp.Context.Message()
|
if m := tcp.Context.Message(); m.Assert(tcp.Conn != nil) {
|
||||||
m.Assert(tcp.Conn != nil)
|
if n, e = tcp.Conn.Read(b); e == io.EOF || m.Assert(e) {
|
||||||
n, e = tcp.Conn.Read(b)
|
m.Capi("nrecv", n)
|
||||||
m.Capi("nrecv", n)
|
}
|
||||||
if e != io.EOF {
|
|
||||||
m.Assert(e)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (tcp *TCP) Write(b []byte) (n int, e error) {
|
func (tcp *TCP) Write(b []byte) (n int, e error) {
|
||||||
m := tcp.Context.Message()
|
if m := tcp.Context.Message(); m.Assert(tcp.Conn != nil) {
|
||||||
m.Assert(tcp.Conn != nil)
|
if n, e = tcp.Conn.Write(b); e == io.EOF || m.Assert(e) {
|
||||||
n, e = tcp.Conn.Write(b)
|
m.Capi("nsend", n)
|
||||||
m.Capi("nsend", n)
|
}
|
||||||
if e != io.EOF {
|
|
||||||
m.Assert(e)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -78,35 +94,13 @@ func (tcp *TCP) Spawn(m *ctx.Message, c *ctx.Context, arg ...string) ctx.Server
|
|||||||
"nrecv": &ctx.Cache{Name: "nrecv", Value: "0", Help: "接收字节数"},
|
"nrecv": &ctx.Cache{Name: "nrecv", Value: "0", Help: "接收字节数"},
|
||||||
"nsend": &ctx.Cache{Name: "nsend", Value: "0", Help: "发送字节数"},
|
"nsend": &ctx.Cache{Name: "nsend", Value: "0", Help: "发送字节数"},
|
||||||
}
|
}
|
||||||
c.Configs = map[string]*ctx.Config{}
|
return &TCP{Context: c}
|
||||||
|
|
||||||
s := new(TCP)
|
|
||||||
s.Context = c
|
|
||||||
return s
|
|
||||||
}
|
}
|
||||||
func (tcp *TCP) Begin(m *ctx.Message, arg ...string) ctx.Server {
|
func (tcp *TCP) Begin(m *ctx.Message, arg ...string) ctx.Server {
|
||||||
return tcp
|
return tcp
|
||||||
}
|
}
|
||||||
func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool {
|
func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool {
|
||||||
address := []string{}
|
address, arg := tcp.parse(m, arg...)
|
||||||
if arg[1] == "dev" {
|
|
||||||
m.Cmd("web.get", arg[1], arg[2], "temp", "ports", "format", "object").Table(func(line map[string]string) {
|
|
||||||
address = append(address, line["value"])
|
|
||||||
})
|
|
||||||
if len(address) == 0 {
|
|
||||||
m.Log("warn", "dial failure %v", arg)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
for i := 2; i < len(arg)-1; i++ {
|
|
||||||
arg[i] = arg[i+1]
|
|
||||||
}
|
|
||||||
if len(arg) > 2 {
|
|
||||||
arg = arg[:len(arg)-1]
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
address = append(address, m.Cap("address", m.Confx("address", arg, 1)))
|
|
||||||
}
|
|
||||||
|
|
||||||
m.Cap("security", m.Confx("security", arg, 2))
|
m.Cap("security", m.Confx("security", arg, 2))
|
||||||
m.Cap("protocol", m.Confx("protocol", arg, 3))
|
m.Cap("protocol", m.Confx("protocol", arg, 3))
|
||||||
|
|
||||||
@ -117,17 +111,17 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool {
|
|||||||
m.Assert(e)
|
m.Assert(e)
|
||||||
conf := &tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true}
|
conf := &tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true}
|
||||||
|
|
||||||
tcp.Fuck(address, func(p string) (net.Conn, error) {
|
tcp.Conn = tcp.retry(m, address, func(p string) (net.Conn, error) {
|
||||||
return tls.Dial(m.Cap("protocol"), p, conf)
|
return tls.Dial(m.Cap("protocol"), p, conf)
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
tcp.Fuck(address, func(p string) (net.Conn, error) {
|
tcp.Conn = tcp.retry(m, address, func(p string) (net.Conn, error) {
|
||||||
return net.Dial(m.Cap("protocol"), p)
|
return net.DialTimeout(m.Cap("protocol"), p, kit.Duration(m.Conf("retry", "timeout")))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
m.Log("info", "%s dial %s", m.Cap("nclient"),
|
m.Log("info", "%s connect %s", m.Cap("nclient"),
|
||||||
m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), tcp.RemoteAddr())))
|
m.Cap("stream", fmt.Sprintf("%s->%s", tcp.LocalAddr(), m.Cap("address", tcp.RemoteAddr().String()))))
|
||||||
|
|
||||||
m.Sess("tcp", m)
|
m.Sess("tcp", m)
|
||||||
m.Option("ms_source", tcp.Context.Name)
|
m.Option("ms_source", tcp.Context.Name)
|
||||||
@ -163,16 +157,13 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool {
|
|||||||
tcp.Listener = l
|
tcp.Listener = l
|
||||||
}
|
}
|
||||||
|
|
||||||
m.Log("info", "%d listen %v", m.Capi("nlisten"),
|
m.Log("info", "%d listen %v", m.Capi("nlisten"), m.Cap("stream", fmt.Sprintf("%s", tcp.Addr())))
|
||||||
m.Cap("stream", fmt.Sprintf("%s", tcp.Addr())))
|
|
||||||
|
|
||||||
addr := strings.Split(tcp.Addr().String(), ":")
|
addr := strings.Split(tcp.Addr().String(), ":")
|
||||||
|
|
||||||
ports := []interface{}{}
|
ports := []interface{}{}
|
||||||
m.Cmd("tcp.ifconfig").Table(func(line map[string]string) {
|
if m.Cmd("tcp.ifconfig").Table(func(line map[string]string) {
|
||||||
ports = append(ports, fmt.Sprintf("%s:%s", line["ip"], addr[len(addr)-1]))
|
ports = append(ports, fmt.Sprintf("%s:%s", line["ip"], addr[len(addr)-1]))
|
||||||
})
|
}); len(ports) == 0 {
|
||||||
if len(ports) == 0 {
|
|
||||||
ports = append(ports, fmt.Sprintf("%s:%s", "127.0.0.1", addr[len(addr)-1]))
|
ports = append(ports, fmt.Sprintf("%s:%s", "127.0.0.1", addr[len(addr)-1]))
|
||||||
}
|
}
|
||||||
m.Back(m.Spawn(m.Source()).Put("option", "node.port", ports))
|
m.Back(m.Spawn(m.Source()).Put("option", "node.port", ports))
|
||||||
@ -181,11 +172,11 @@ func (tcp *TCP) Start(m *ctx.Message, arg ...string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
c, e := tcp.Accept()
|
if c, e := tcp.Accept(); m.Assert(e) {
|
||||||
m.Assert(e)
|
m.Spawn(Index).Put("option", "io", c).Call(func(sub *ctx.Message) *ctx.Message {
|
||||||
m.Spawn(Index).Put("option", "io", c).Call(func(sub *ctx.Message) *ctx.Message {
|
return sub.Spawn(m.Source())
|
||||||
return sub.Spawn(m.Source())
|
}, "accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol"))
|
||||||
}, "accept", c.RemoteAddr().String(), m.Cap("security"), m.Cap("protocol"))
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
@ -214,13 +205,12 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心",
|
|||||||
"nclient": &ctx.Cache{Name: "nclient", Value: "0", Help: "连接数量"},
|
"nclient": &ctx.Cache{Name: "nclient", Value: "0", Help: "连接数量"},
|
||||||
},
|
},
|
||||||
Configs: map[string]*ctx.Config{
|
Configs: map[string]*ctx.Config{
|
||||||
"": &ctx.Config{Name: "address", Value: ":9090", Help: "网络地址"},
|
|
||||||
"address": &ctx.Config{Name: "address", Value: ":9090", Help: "网络地址"},
|
"address": &ctx.Config{Name: "address", Value: ":9090", Help: "网络地址"},
|
||||||
"security": &ctx.Config{Name: "security(true/false)", Value: "false", Help: "加密通信"},
|
"security": &ctx.Config{Name: "security(true/false)", Value: "false", Help: "加密通信"},
|
||||||
"protocol": &ctx.Config{Name: "protocol(tcp/tcp4/tcp6)", Value: "tcp4", Help: "网络协议"},
|
"protocol": &ctx.Config{Name: "protocol(tcp/tcp4/tcp6)", Value: "tcp4", Help: "网络协议"},
|
||||||
"retry": &ctx.Config{Name: "retry", Value: map[string]interface{}{
|
"retry": &ctx.Config{Name: "retry", Value: map[string]interface{}{
|
||||||
"interval": "3s", "counts": 3,
|
"interval": "3s", "counts": 3, "timeout": "10s",
|
||||||
}, Help: "网络协议"},
|
}, Help: "网络重试"},
|
||||||
},
|
},
|
||||||
Commands: map[string]*ctx.Command{
|
Commands: map[string]*ctx.Command{
|
||||||
"listen": &ctx.Command{Name: "listen address [security [protocol]]", Help: "网络监听", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
"listen": &ctx.Command{Name: "listen address [security [protocol]]", Help: "网络监听", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
||||||
@ -228,30 +218,31 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心",
|
|||||||
return
|
return
|
||||||
}},
|
}},
|
||||||
"accept": &ctx.Command{Name: "accept address [security [protocol]]", Help: "网络连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
"accept": &ctx.Command{Name: "accept address [security [protocol]]", Help: "网络连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
||||||
m.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), "网络连接", m.Meta["detail"]...)
|
m.Start(fmt.Sprintf("sub%d", m.Capi("nclient", 1)), "网络连接", m.Meta["detail"]...)
|
||||||
return
|
return
|
||||||
}},
|
}},
|
||||||
"dial": &ctx.Command{Name: "dial address [security [protocol]]", Help: "网络连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
"dial": &ctx.Command{Name: "dial address [security [protocol]]", Help: "网络连接", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
||||||
m.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), "网络连接", m.Meta["detail"]...)
|
m.Start(fmt.Sprintf("com%d", m.Capi("nclient", 1)), "网络连接", m.Meta["detail"]...)
|
||||||
return
|
return
|
||||||
}},
|
}},
|
||||||
|
|
||||||
"send": &ctx.Command{Name: "send message", Help: "发送消息", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
"send": &ctx.Command{Name: "send message", Help: "发送消息", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
||||||
if tcp, ok := m.Target().Server.(*TCP); m.Assert(ok) {
|
if tcp, ok := m.Target().Server.(*TCP); m.Assert(ok) {
|
||||||
tcp.Write([]byte(arg[0]))
|
fmt.Fprint(tcp, arg[0])
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}},
|
}},
|
||||||
"recv": &ctx.Command{Name: "recv size", Help: "接收消息", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
"recv": &ctx.Command{Name: "recv size", Help: "接收消息", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
||||||
if tcp, ok := m.Target().Server.(*TCP); m.Assert(ok) {
|
if tcp, ok := m.Target().Server.(*TCP); m.Assert(ok) {
|
||||||
n, e := strconv.Atoi(arg[0])
|
if n, e := strconv.Atoi(arg[0]); m.Assert(e) {
|
||||||
m.Assert(e)
|
buf := make([]byte, n)
|
||||||
buf := make([]byte, n)
|
n, _ = tcp.Read(buf)
|
||||||
|
m.Echo(string(buf[:n]))
|
||||||
n, _ = tcp.Read(buf)
|
}
|
||||||
m.Echo(string(buf[:n]))
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}},
|
}},
|
||||||
|
|
||||||
"ifconfig": &ctx.Command{Name: "ifconfig [name]", Help: "网络配置", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
"ifconfig": &ctx.Command{Name: "ifconfig [name]", Help: "网络配置", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
||||||
if ifs, e := net.Interfaces(); m.Assert(e) {
|
if ifs, e := net.Interfaces(); m.Assert(e) {
|
||||||
for _, v := range ifs {
|
for _, v := range ifs {
|
||||||
@ -268,11 +259,11 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心",
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
m.Add("append", "index", v.Index)
|
m.Push("index", v.Index)
|
||||||
m.Add("append", "name", v.Name)
|
m.Push("name", v.Name)
|
||||||
m.Add("append", "ip", ip[0])
|
m.Push("ip", ip[0])
|
||||||
m.Add("append", "mask", ip[1])
|
m.Push("mask", ip[1])
|
||||||
m.Add("append", "hard", v.HardwareAddr.String())
|
m.Push("hard", v.HardwareAddr.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -281,6 +272,17 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心",
|
|||||||
return
|
return
|
||||||
}},
|
}},
|
||||||
"probe": &ctx.Command{Name: "probe [port]", Help: "端口检测", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
"probe": &ctx.Command{Name: "probe [port]", Help: "端口检测", Hand: func(m *ctx.Message, c *ctx.Context, key string, arg ...string) (e error) {
|
||||||
|
if len(arg) == 0 {
|
||||||
|
for i := 0; i < 1024; i++ {
|
||||||
|
m.Show("port: %v", i)
|
||||||
|
if t, e := net.DialTimeout("tcp", fmt.Sprintf(":%d", i), 3*time.Second); e == nil {
|
||||||
|
m.Push("port", i)
|
||||||
|
t.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.Table()
|
||||||
|
return
|
||||||
|
}
|
||||||
if t, e := net.DialTimeout("tcp", arg[0], 10*time.Second); e == nil {
|
if t, e := net.DialTimeout("tcp", arg[0], 10*time.Second); e == nil {
|
||||||
m.Echo("active")
|
m.Echo("active")
|
||||||
t.Close()
|
t.Close()
|
||||||
@ -291,7 +293,5 @@ var Index = &ctx.Context{Name: "tcp", Help: "网络中心",
|
|||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
tcp := &TCP{}
|
ctx.Index.Register(Index, &TCP{Context: Index})
|
||||||
tcp.Context = Index
|
|
||||||
ctx.Index.Register(Index, tcp)
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user