command (next): Improvements for parallel programs

This patch aims to improve how Delve tracks the current goroutine,
especially in very highly parallel programs. The main spirit of this
patch is to ensure that even in situations where the goroutine we care
about is not executing (common for len(g) > len(m)) we still end up back
on that goroutine as a result of executing the 'next' command.

We accomplish this by tracking our original goroutine id, and any time a
breakpoint is hit or a threads stops, we examine the stopped threads and
see if any are executing the goroutine we care about. If not, we set
'next' breakpoint for them again and continue them. This is done so that
one of those threads can eventually pick up the goroutine we care about
and begin executing it again.
This commit is contained in:
Derek Parker 2015-08-20 09:28:11 -05:00
parent 71845350a0
commit b9846c7684
10 changed files with 205 additions and 74 deletions

@ -0,0 +1,21 @@
package main
import (
"fmt"
"sync"
)
func sayhi(n int, wg *sync.WaitGroup) {
fmt.Println("hi", n)
fmt.Println("hi", n)
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go sayhi(i, &wg)
}
wg.Wait()
}

@ -252,7 +252,7 @@ func (dbp *Process) Next() error {
return dbp.run(dbp.next) return dbp.run(dbp.next)
} }
func (dbp *Process) next() error { func (dbp *Process) next() (err error) {
// Make sure we clean up the temp breakpoints created by thread.Next // Make sure we clean up the temp breakpoints created by thread.Next
defer dbp.clearTempBreakpoints() defer dbp.clearTempBreakpoints()
@ -260,63 +260,78 @@ func (dbp *Process) next() error {
// blocked trying to read from a channel. This is so that // blocked trying to read from a channel. This is so that
// if control flow switches to that goroutine, we end up // if control flow switches to that goroutine, we end up
// somewhere useful instead of in runtime code. // somewhere useful instead of in runtime code.
chanRecvCount, err := dbp.setChanRecvBreakpoints() if _, err := dbp.setChanRecvBreakpoints(); err != nil {
if err != nil {
return err return err
} }
// Get the goroutine for the current thread. We will
// use it later in order to ensure we are on the same
// goroutine.
g, err := dbp.CurrentThread.GetG() g, err := dbp.CurrentThread.GetG()
if err != nil { if err != nil {
return err return err
} }
if g.DeferPC != 0 {
_, err = dbp.SetTempBreakpoint(g.DeferPC)
if err != nil {
return err
}
}
var goroutineExiting bool var goroutineExiting bool
var waitCount int threadNext := func(thread *Thread) error {
for _, th := range dbp.Threads { if err = thread.setNextBreakpoints(); err != nil {
// Ignore threads that aren't running go code. switch t := err.(type) {
if !th.blocked() { case ThreadBlockedError, NoReturnAddr: // Noop
waitCount++ case GoroutineExitingError:
if err = th.SetNextBreakpoints(); err != nil { goroutineExiting = t.goid == g.Id
if gerr, ok := err.(GoroutineExitingError); ok { default:
waitCount = waitCount - 1 + chanRecvCount return err
if gerr.goid == g.Id {
goroutineExiting = true
}
} else {
return err
}
} }
} }
if err = th.Continue(); err != nil { return thread.Continue()
}
// Make sure that we halt the process at the end of this
// function. We could get into a situation where we have
// started some, but not all threads.
defer func() { err = dbp.Halt() }()
// Set next breakpoints and then continue each thread.
for _, th := range dbp.Threads {
if err := threadNext(th); err != nil {
return err return err
} }
} }
for waitCount > 0 { for {
thread, err := dbp.trapWait(-1) if _, err := dbp.trapWait(-1); err != nil {
if err != nil {
return err return err
} }
tg, err := thread.GetG() // We need to wait for our goroutine to execute, which may not happen
if err != nil { // immediately.
return err //
} // Loop through all threads, and for each stopped thread
// Make sure we're on the same goroutine, unless it has exited. // see if it is the thread that we care about (thread.g == original.g).
if tg.Id == g.Id || goroutineExiting { // If so, we're done. Otherwise set next temp breakpoints for
if dbp.CurrentThread != thread { // each thread and continue them. The reason we do this is because
dbp.SwitchThread(thread.Id) // if our goroutine is paused, we must execute other threads in order
// for them to get to a scheduling point, so they can pick up the
// goroutine we care about and begin executing it.
for _, thr := range dbp.Threads {
if !thr.Stopped() {
continue
}
tg, err := thr.GetG()
if err != nil {
return err
}
// Make sure we're on the same goroutine, unless it has exited.
if tg.Id == g.Id || goroutineExiting {
if dbp.CurrentThread != thr {
dbp.SwitchThread(thr.Id)
}
return nil
}
if err := threadNext(thr); err != nil {
return err
} }
} }
waitCount--
} }
return dbp.Halt()
} }
func (dbp *Process) setChanRecvBreakpoints() (int, error) { func (dbp *Process) setChanRecvBreakpoints() (int, error) {

@ -67,7 +67,7 @@ func (dbp *Process) Kill() (err error) {
if dbp.exited { if dbp.exited {
return nil return nil
} }
if !stopped(dbp.Pid) { if !dbp.Threads[dbp.Pid].Stopped() {
return errors.New("process must be stopped in order to kill it") return errors.New("process must be stopped in order to kill it")
} }
if err = sys.Kill(-dbp.Pid, sys.SIGKILL); err != nil { if err = sys.Kill(-dbp.Pid, sys.SIGKILL); err != nil {
@ -322,14 +322,6 @@ func status(pid int) rune {
return state return state
} }
func stopped(pid int) bool {
state := status(pid)
if state == STATUS_TRACE_STOP {
return true
}
return false
}
func wait(pid, tgid, options int) (int, *sys.WaitStatus, error) { func wait(pid, tgid, options int) (int, *sys.WaitStatus, error) {
var s sys.WaitStatus var s sys.WaitStatus
if (pid != tgid) || (options != 0) { if (pid != tgid) || (options != 0) {

@ -130,6 +130,9 @@ func TestHalt(t *testing.T) {
// actually stopped, err will not be nil if the process // actually stopped, err will not be nil if the process
// is still running. // is still running.
for _, th := range p.Threads { for _, th := range p.Threads {
if !th.Stopped() {
t.Fatal("expected thread to be stopped, but was not")
}
if th.running != false { if th.running != false {
t.Fatal("expected running = false for thread", th.Id) t.Fatal("expected running = false for thread", th.Id)
} }
@ -297,6 +300,36 @@ func TestNextGeneral(t *testing.T) {
testnext("testnextprog", testcases, "main.testnext", t) testnext("testnextprog", testcases, "main.testnext", t)
} }
func TestNextConcurrent(t *testing.T) {
testcases := []nextTest{
{9, 10},
{10, 11},
}
withTestProcess("parallel_next", t, func(p *Process, fixture protest.Fixture) {
_, err := setFunctionBreakpoint(p, "main.sayhi")
assertNoError(err, t, "SetBreakpoint")
assertNoError(p.Continue(), t, "Continue")
f, ln := currentLineNumber(p, t)
initV, err := p.EvalVariable("n")
assertNoError(err, t, "EvalVariable")
for _, tc := range testcases {
if ln != tc.begin {
t.Fatalf("Program not stopped at correct spot expected %d was %s:%d", tc.begin, filepath.Base(f), ln)
}
assertNoError(p.Next(), t, "Next() returned an error")
f, ln = currentLineNumber(p, t)
if ln != tc.end {
t.Fatalf("Program did not continue to correct next location expected %d was %s:%d", tc.end, filepath.Base(f), ln)
}
v, err := p.EvalVariable("n")
assertNoError(err, t, "EvalVariable")
if v.Value != initV.Value {
t.Fatal("Did not end up on same goroutine")
}
}
})
}
func TestNextGoroutine(t *testing.T) { func TestNextGoroutine(t *testing.T) {
testcases := []nextTest{ testcases := []nextTest{
{47, 42}, {47, 42},

@ -5,6 +5,14 @@ import (
"fmt" "fmt"
) )
type NoReturnAddr struct {
fn string
}
func (nra NoReturnAddr) Error() string {
return fmt.Sprintf("could not find return address for %s", nra.fn)
}
// Takes an offset from RSP and returns the address of the // Takes an offset from RSP and returns the address of the
// instruction the current function is going to return to. // instruction the current function is going to return to.
func (thread *Thread) ReturnAddress() (uint64, error) { func (thread *Thread) ReturnAddress() (uint64, error) {
@ -13,7 +21,7 @@ func (thread *Thread) ReturnAddress() (uint64, error) {
return 0, err return 0, err
} }
if len(locations) < 2 { if len(locations) < 2 {
return 0, fmt.Errorf("could not find return address for %s", locations[0].Fn.BaseName()) return 0, NoReturnAddr{locations[0].Fn.BaseName()}
} }
return locations[1].PC, nil return locations[1].PC, nil
} }

@ -115,6 +115,12 @@ func (thread *Thread) Location() (*Location, error) {
return &Location{PC: pc, File: f, Line: l, Fn: fn}, nil return &Location{PC: pc, File: f, Line: l, Fn: fn}, nil
} }
type ThreadBlockedError struct{}
func (tbe ThreadBlockedError) Error() string {
return ""
}
// Set breakpoints for potential next lines. // Set breakpoints for potential next lines.
// //
// There are two modes of operation for this method. First, // There are two modes of operation for this method. First,
@ -129,11 +135,23 @@ func (thread *Thread) Location() (*Location, error) {
// at every single line within the current function, and // at every single line within the current function, and
// another at the functions return address, in case we're at // another at the functions return address, in case we're at
// the end. // the end.
func (thread *Thread) SetNextBreakpoints() (err error) { func (thread *Thread) setNextBreakpoints() (err error) {
if thread.blocked() {
return ThreadBlockedError{}
}
curpc, err := thread.PC() curpc, err := thread.PC()
if err != nil { if err != nil {
return err return err
} }
g, err := thread.GetG()
if err != nil {
return err
}
if g.DeferPC != 0 {
if _, err = thread.dbp.SetTempBreakpoint(g.DeferPC); err != nil {
return err
}
}
// Grab info on our current stack frame. Used to determine // Grab info on our current stack frame. Used to determine
// whether we may be stepping outside of the current function. // whether we may be stepping outside of the current function.
@ -148,15 +166,11 @@ func (thread *Thread) SetNextBreakpoints() (err error) {
return err return err
} }
if filepath.Ext(loc.File) == ".go" { if filepath.Ext(loc.File) == ".go" {
if err = thread.next(curpc, fde, loc.File, loc.Line); err != nil { err = thread.next(curpc, fde, loc.File, loc.Line)
return err
}
} else { } else {
if err = thread.cnext(curpc, fde); err != nil { err = thread.cnext(curpc, fde)
return err
}
} }
return nil return err
} }
// Go routine is exiting. // Go routine is exiting.
@ -278,3 +292,10 @@ func (thread *Thread) GetG() (g *G, err error) {
} }
return return
} }
// Returns whether the thread is stopped at
// the operating system level. Actual implementation
// is OS dependant, look in OS thread file.
func (thread *Thread) Stopped() bool {
return thread.stopped()
}

@ -123,3 +123,15 @@ clear_trap_flag(thread_act_t thread) {
return thread_set_state(thread, x86_THREAD_STATE64, (thread_state_t)&regs, count); return thread_set_state(thread, x86_THREAD_STATE64, (thread_state_t)&regs, count);
} }
int
thread_blocked(thread_act_t thread) {
kern_return_t kret;
struct thread_basic_info info;
unsigned int info_count = THREAD_BASIC_INFO_COUNT;
kret = thread_info((thread_t)thread, THREAD_BASIC_INFO, (thread_info_t)&info, &info_count);
if (kret != KERN_SUCCESS) return -1;
return info.suspend_count;
}

@ -12,14 +12,22 @@ type OSSpecificDetails struct {
registers C.x86_thread_state64_t registers C.x86_thread_state64_t
} }
func (t *Thread) Halt() error { func (t *Thread) Halt() (err error) {
var kret C.kern_return_t defer func() {
kret = C.thread_suspend(t.os.thread_act) if err == nil {
if kret != C.KERN_SUCCESS { t.running = false
return fmt.Errorf("could not suspend thread %d", t.Id) }
}()
if t.Stopped() {
return
} }
t.running = false kret := C.thread_suspend(t.os.thread_act)
return nil if kret != C.KERN_SUCCESS {
errStr := C.GoString(C.mach_error_string(C.mach_error_t(kret)))
err = fmt.Errorf("could not suspend thread %d %s", t.Id, errStr)
return
}
return
} }
func (t *Thread) singleStep() error { func (t *Thread) singleStep() error {
@ -50,10 +58,13 @@ func (t *Thread) resume() error {
return nil return nil
} }
func (t *Thread) blocked() bool { func (thread *Thread) blocked() bool {
// TODO(dp) cache the func pc to remove this lookup // TODO(dp) cache the func pc to remove this lookup
pc, _ := t.PC() pc, err := thread.PC()
fn := t.dbp.goSymTable.PCToFunc(pc) if err != nil {
return false
}
fn := thread.dbp.goSymTable.PCToFunc(pc)
if fn == nil { if fn == nil {
return false return false
} }
@ -65,6 +76,10 @@ func (t *Thread) blocked() bool {
} }
} }
func (thread *Thread) stopped() bool {
return C.thread_blocked(thread.os.thread_act) > C.int(0)
}
func (thread *Thread) writeMemory(addr uintptr, data []byte) (int, error) { func (thread *Thread) writeMemory(addr uintptr, data []byte) (int, error) {
if len(data) == 0 { if len(data) == 0 {
return 0, nil return 0, nil

@ -30,3 +30,6 @@ set_registers(mach_port_name_t, x86_thread_state64_t*);
kern_return_t kern_return_t
get_identity(mach_port_name_t, thread_identifier_info_data_t *); get_identity(mach_port_name_t, thread_identifier_info_data_t *);
int
thread_blocked(thread_act_t thread);

@ -12,20 +12,31 @@ type OSSpecificDetails struct {
registers sys.PtraceRegs registers sys.PtraceRegs
} }
func (t *Thread) Halt() error { func (t *Thread) Halt() (err error) {
if stopped(t.Id) { defer func() {
return nil if err == nil {
t.running = false
}
}()
if t.Stopped() {
return
} }
err := sys.Tgkill(t.dbp.Pid, t.Id, sys.SIGSTOP) err = sys.Tgkill(t.dbp.Pid, t.Id, sys.SIGSTOP)
if err != nil { if err != nil {
return fmt.Errorf("halt err %s on thread %d", err, t.Id) err = fmt.Errorf("halt err %s on thread %d", err, t.Id)
return
} }
_, _, err = wait(t.Id, t.dbp.Pid, 0) _, _, err = wait(t.Id, t.dbp.Pid, 0)
if err != nil { if err != nil {
return fmt.Errorf("wait err %s on thread %d", err, t.Id) err = fmt.Errorf("wait err %s on thread %d", err, t.Id)
return
} }
t.running = false return
return nil }
func (thread *Thread) stopped() bool {
state := status(thread.Id)
return state == STATUS_TRACE_STOP
} }
func (t *Thread) resume() (err error) { func (t *Thread) resume() (err error) {