From 7dc0108151f114ad5efd7c2944e6229451ca5dc9 Mon Sep 17 00:00:00 2001 From: shaoying Date: Wed, 3 Jun 2020 19:12:20 +0800 Subject: [PATCH] add exec --- exec.go | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 exec.go diff --git a/exec.go b/exec.go new file mode 100644 index 00000000..de898ceb --- /dev/null +++ b/exec.go @@ -0,0 +1,115 @@ +package ice + +import ( + kit "contexts/src/toolkit" + "errors" + "fmt" + "io" + "runtime" + "strings" + "time" +) + +func (m *Message) TryCatch(msg *Message, safe bool, hand ...func(msg *Message)) *Message { + defer func() { + switch e := recover(); e { + case io.EOF: + case nil: + default: + _, file, line, _ := runtime.Caller(3) + if list := strings.Split(file, "/"); len(list) > 2 { + file = strings.Join(list[len(list)-2:], "/") + } + m.Log(LOG_WARN, "catch: %s %s:%d", e, file, line) + m.Log(LOG_INFO, "chain: %s", msg.Format("chain")) + m.Log(LOG_WARN, "catch: %s %s:%d", e, file, line) + m.Log(LOG_INFO, "stack: %s", msg.Format("stack")) + if m.Log(LOG_WARN, "catch: %s %s:%d", e, file, line); len(hand) > 1 { + // 捕获异常 + m.TryCatch(msg, safe, hand[1:]...) + } else if !safe { + // 抛出异常 + m.Assert(e) + } + } + }() + + if len(hand) > 0 { + // 运行函数 + hand[0](msg) + } + return m +} +func (m *Message) Assert(arg interface{}) bool { + switch arg := arg.(type) { + case nil: + return true + case bool: + if arg == true { + return true + } + } + + // 抛出异常 + panic(errors.New(fmt.Sprintf("error %v", arg))) +} +func (m *Message) Sleep(arg string) *Message { + time.Sleep(kit.Duration(arg)) + return m +} +func (m *Message) Hold(n int) *Message { + ctx := m.target.root + if c := m.target; c.context != nil && c.context.wg != nil { + ctx = c.context + } + + ctx.wg.Add(n) + m.Log(LOG_TRACE, "%s wait %s %v", ctx.Name, m.target.Name, ctx.wg) + return m +} +func (m *Message) Done() bool { + defer func() { recover() }() + + ctx := m.target.root + if c := m.target; c.context != nil && c.context.wg != nil { + ctx = c.context + } + + m.Log(LOG_TRACE, "%s done %s %v", ctx.Name, m.target.Name, ctx.wg) + ctx.wg.Done() + return true +} +func (m *Message) Call(sync bool, cb func(*Message) *Message) *Message { + wait := make(chan bool, 2) + t := time.AfterFunc(kit.Duration(kit.Select("10s", m.Option("timeout"))), func() { + m.Warn(true, "timeout %v", m.Detailv()) + wait <- false + m.Back(nil) + }) + m.cb = func(sub *Message) *Message { + if sync { + wait <- true + t.Stop() + } + return cb(sub) + } + + if sync { + <-wait + } else { + t.Stop() + } + return m +} +func (m *Message) Back(res *Message) *Message { + if m.cb != nil { + if sub := m.cb(res); m.message != nil { + m.message.Back(sub) + } + } + return m +} +func (m *Message) Gos(msg *Message, cb func(*Message)) *Message { + go func() { msg.TryCatch(msg, true, func(msg *Message) { cb(msg) }) }() + return m +}