service: serialize calls to Command API (#2370)

* service: serialize calls to Command API

Wait until the target process has resumed before accepting new calls to
Command. Before this if a 'continue' was immediately followed by a
'halt' the 'halt' could be processed before the 'continue'.

Fixes #1608
Fixes #2216

* service/rpccommon: fix DeepSource issues
This commit is contained in:
Alessandro Arzilli 2021-03-08 19:05:10 +01:00 committed by GitHub
parent a3de99ab97
commit fe904c14d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 18 deletions

@ -549,7 +549,7 @@ func isValidLaunchMode(launchMode interface{}) bool {
func (s *Server) onDisconnectRequest(request *dap.DisconnectRequest) {
s.send(&dap.DisconnectResponse{Response: *newResponse(request.Request)})
if s.debugger != nil {
_, err := s.debugger.Command(&api.DebuggerCommand{Name: api.Halt})
_, err := s.debugger.Command(&api.DebuggerCommand{Name: api.Halt}, nil)
if err != nil {
s.log.Error(err)
}
@ -1265,7 +1265,7 @@ func (s *Server) onEvaluateRequest(request *dap.EvaluateRequest) {
Expr: strings.Replace(request.Arguments.Expression, "call ", "", 1),
UnsafeCall: false,
GoroutineID: goid,
})
}, nil)
if _, isexited := err.(proc.ErrProcessExited); isexited || err == nil && state.Exited {
e := &dap.TerminatedEvent{Event: *newEvent("terminated")}
s.send(e)
@ -1487,7 +1487,7 @@ func (s *Server) doCommand(command string) {
return
}
state, err := s.debugger.Command(&api.DebuggerCommand{Name: command})
state, err := s.debugger.Command(&api.DebuggerCommand{Name: command}, nil)
if _, isexited := err.(proc.ErrProcessExited); isexited || err == nil && state.Exited {
e := &dap.TerminatedEvent{Event: *newEvent("terminated")}
s.send(e)

@ -895,7 +895,7 @@ func (d *Debugger) isRunning() bool {
}
// Command handles commands which control the debugger lifecycle
func (d *Debugger) Command(command *api.DebuggerCommand) (*api.DebuggerState, error) {
func (d *Debugger) Command(command *api.DebuggerCommand, resumeNotify chan struct{}) (*api.DebuggerState, error) {
var err error
if command.Name == api.Halt {
@ -918,6 +918,12 @@ func (d *Debugger) Command(command *api.DebuggerCommand) (*api.DebuggerState, er
d.setRunning(true)
defer d.setRunning(false)
if command.Name != api.SwitchGoroutine && command.Name != api.SwitchThread && command.Name != api.Halt {
d.target.ResumeNotify(resumeNotify)
} else if resumeNotify != nil {
close(resumeNotify)
}
switch command.Name {
case api.Continue:
d.log.Debug("continuing")

@ -60,7 +60,7 @@ func (s *RPCServer) State(arg interface{}, state *api.DebuggerState) error {
}
func (s *RPCServer) Command(command *api.DebuggerCommand, cb service.RPCCallback) {
st, err := s.debugger.Command(command)
st, err := s.debugger.Command(command, cb.SetupDoneChan())
cb.Return(st, err)
}

@ -92,6 +92,7 @@ type RestartOut struct {
// Restart restarts program.
func (s *RPCServer) Restart(arg RestartIn, cb service.RPCCallback) {
close(cb.SetupDoneChan())
if s.config.Debugger.AttachPid != 0 {
cb.Return(nil, errors.New("cannot restart process Delve did not create"))
return
@ -113,6 +114,7 @@ type StateOut struct {
// State returns the current debugger state.
func (s *RPCServer) State(arg StateIn, cb service.RPCCallback) {
close(cb.SetupDoneChan())
var out StateOut
st, err := s.debugger.State(arg.NonBlocking)
if err != nil {
@ -129,7 +131,7 @@ type CommandOut struct {
// Command interrupts, continues and steps through the program.
func (s *RPCServer) Command(command api.DebuggerCommand, cb service.RPCCallback) {
st, err := s.debugger.Command(&command)
st, err := s.debugger.Command(&command, cb.SetupDoneChan())
if err != nil {
cb.Return(nil, err)
return
@ -856,6 +858,7 @@ type StopRecordingOut struct {
}
func (s *RPCServer) StopRecording(arg StopRecordingIn, cb service.RPCCallback) {
close(cb.SetupDoneChan())
var out StopRecordingOut
err := s.debugger.StopRecording()
if err != nil {

@ -3,4 +3,9 @@ package service
// RPCCallback is used by RPC methods to return their result asynchronously.
type RPCCallback interface {
Return(out interface{}, err error)
// SetupDone returns a channel that should be closed to signal that the
// asynchornous method has completed setup and the server is ready to
// receive other requests.
SetupDoneChan() chan struct{}
}

@ -46,12 +46,15 @@ type ServerImpl struct {
}
type RPCCallback struct {
s *ServerImpl
sending *sync.Mutex
codec rpc.ServerCodec
req rpc.Request
s *ServerImpl
sending *sync.Mutex
codec rpc.ServerCodec
req rpc.Request
setupDone chan struct{}
}
var _ service.RPCCallback = &RPCCallback{}
// RPCServer implements the RPC method calls common to all versions of the API.
type RPCServer struct {
s *ServerImpl
@ -163,8 +166,8 @@ var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
ch, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(ch)
}
// Is this type exported or a builtin?
@ -236,12 +239,10 @@ func suitableMethods(rcvr interface{}, methods map[string]*methodType, log *logr
log.Warn("method", mname, "returns", returnType.String(), "not error")
continue
}
} else {
} else if mtype.NumOut() != 0 {
// Method needs zero outs.
if mtype.NumOut() != 0 {
log.Warn("method", mname, "has wrong number of outs:", mtype.NumOut())
continue
}
log.Warn("method", mname, "has wrong number of outs:", mtype.NumOut())
continue
}
methods[sname+"."+mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType, Synchronous: synchronous, Rcvr: rcvrv}
}
@ -328,7 +329,7 @@ func (s *ServerImpl) serveJSONCodec(conn io.ReadWriteCloser) {
s.log.Debugf("(async %d) <- %s(%T%s)", req.Seq, req.ServiceMethod, argv.Interface(), argvbytes)
}
function := mtype.method.Func
ctl := &RPCCallback{s, sending, codec, req}
ctl := &RPCCallback{s, sending, codec, req, make(chan struct{})}
go func() {
defer func() {
if ierr := recover(); ierr != nil {
@ -337,6 +338,7 @@ func (s *ServerImpl) serveJSONCodec(conn io.ReadWriteCloser) {
}()
function.Call([]reflect.Value{mtype.Rcvr, argv, reflect.ValueOf(ctl)})
}()
<-ctl.setupDone
}
}
codec.Close()
@ -363,6 +365,12 @@ func (s *ServerImpl) sendResponse(sending *sync.Mutex, req *rpc.Request, resp *r
}
func (cb *RPCCallback) Return(out interface{}, err error) {
select {
case <-cb.setupDone:
// already closed
default:
close(cb.setupDone)
}
errmsg := ""
if err != nil {
errmsg = err.Error()
@ -375,6 +383,10 @@ func (cb *RPCCallback) Return(out interface{}, err error) {
cb.s.sendResponse(cb.sending, &cb.req, &resp, out, cb.codec, errmsg)
}
func (cb *RPCCallback) SetupDoneChan() chan struct{} {
return cb.setupDone
}
// GetVersion returns the version of delve as well as the API version
// currently served.
func (s *RPCServer) GetVersion(args api.GetVersionIn, out *api.GetVersionOut) error {