service/dap: fix close on closed channel panic (#3573)

Fixes close on closed channel panic that happens sporadically on many
of the dap tests (for example 1 ~ 3% of the times on
TestStepInstruction).
This commit is contained in:
Alessandro Arzilli 2023-12-04 15:40:59 +01:00 committed by GitHub
parent 15142ac3d6
commit f558ca4f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 25 deletions

@ -686,7 +686,7 @@ func (s *Session) handleRequest(request dap.Message) {
// Non-blocking request handlers will signal when they are ready
// setting up for async execution, so more requests can be processed.
resumeRequestLoop := make(chan struct{})
resumeRequestLoop := newSyncflag()
switch request := request.(type) {
//--- Asynchronous requests ---
@ -695,43 +695,43 @@ func (s *Session) handleRequest(request dap.Message) {
defer s.recoverPanic(request)
s.onConfigurationDoneRequest(request, resumeRequestLoop)
}()
<-resumeRequestLoop
resumeRequestLoop.wait()
case *dap.ContinueRequest: // Required
go func() {
defer s.recoverPanic(request)
s.onContinueRequest(request, resumeRequestLoop)
}()
<-resumeRequestLoop
resumeRequestLoop.wait()
case *dap.NextRequest: // Required
go func() {
defer s.recoverPanic(request)
s.onNextRequest(request, resumeRequestLoop)
}()
<-resumeRequestLoop
resumeRequestLoop.wait()
case *dap.StepInRequest: // Required
go func() {
defer s.recoverPanic(request)
s.onStepInRequest(request, resumeRequestLoop)
}()
<-resumeRequestLoop
resumeRequestLoop.wait()
case *dap.StepOutRequest: // Required
go func() {
defer s.recoverPanic(request)
s.onStepOutRequest(request, resumeRequestLoop)
}()
<-resumeRequestLoop
resumeRequestLoop.wait()
case *dap.StepBackRequest: // Optional (capability 'supportsStepBack')
go func() {
defer s.recoverPanic(request)
s.onStepBackRequest(request, resumeRequestLoop)
}()
<-resumeRequestLoop
resumeRequestLoop.wait()
case *dap.ReverseContinueRequest: // Optional (capability 'supportsStepBack')
go func() {
defer s.recoverPanic(request)
s.onReverseContinueRequest(request, resumeRequestLoop)
}()
<-resumeRequestLoop
resumeRequestLoop.wait()
//--- Synchronous requests ---
case *dap.SetBreakpointsRequest: // Required
s.onSetBreakpointsRequest(request)
@ -1663,8 +1663,8 @@ func closeIfOpen(ch chan struct{}) {
// This is an optional request enabled by capability 'supportsConfigurationDoneRequest'.
// It gets triggered after all the debug requests that follow initialized event,
// so the s.debugger is guaranteed to be set. Expects the target to be halted.
func (s *Session) onConfigurationDoneRequest(request *dap.ConfigurationDoneRequest, allowNextStateChange chan struct{}) {
defer closeIfOpen(allowNextStateChange)
func (s *Session) onConfigurationDoneRequest(request *dap.ConfigurationDoneRequest, allowNextStateChange *syncflag) {
defer allowNextStateChange.raise()
if s.args.stopOnEntry {
e := &dap.StoppedEvent{
Event: *newEvent("stopped"),
@ -1684,7 +1684,7 @@ func (s *Session) onConfigurationDoneRequest(request *dap.ConfigurationDoneReque
// onContinueRequest handles 'continue' request.
// This is a mandatory request to support.
func (s *Session) onContinueRequest(request *dap.ContinueRequest, allowNextStateChange chan struct{}) {
func (s *Session) onContinueRequest(request *dap.ContinueRequest, allowNextStateChange *syncflag) {
s.send(&dap.ContinueResponse{
Response: *newResponse(request.Request),
Body: dap.ContinueResponseBody{AllThreadsContinued: true}})
@ -1922,21 +1922,21 @@ func (s *Session) onAttachRequest(request *dap.AttachRequest) {
// onNextRequest handles 'next' request.
// This is a mandatory request to support.
func (s *Session) onNextRequest(request *dap.NextRequest, allowNextStateChange chan struct{}) {
func (s *Session) onNextRequest(request *dap.NextRequest, allowNextStateChange *syncflag) {
s.sendStepResponse(request.Arguments.ThreadId, &dap.NextResponse{Response: *newResponse(request.Request)})
s.stepUntilStopAndNotify(api.Next, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange)
}
// onStepInRequest handles 'stepIn' request
// This is a mandatory request to support.
func (s *Session) onStepInRequest(request *dap.StepInRequest, allowNextStateChange chan struct{}) {
func (s *Session) onStepInRequest(request *dap.StepInRequest, allowNextStateChange *syncflag) {
s.sendStepResponse(request.Arguments.ThreadId, &dap.StepInResponse{Response: *newResponse(request.Request)})
s.stepUntilStopAndNotify(api.Step, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange)
}
// onStepOutRequest handles 'stepOut' request
// This is a mandatory request to support.
func (s *Session) onStepOutRequest(request *dap.StepOutRequest, allowNextStateChange chan struct{}) {
func (s *Session) onStepOutRequest(request *dap.StepOutRequest, allowNextStateChange *syncflag) {
s.sendStepResponse(request.Arguments.ThreadId, &dap.StepOutResponse{Response: *newResponse(request.Request)})
s.stepUntilStopAndNotify(api.StepOut, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange)
}
@ -1993,8 +1993,8 @@ func (s *Session) stoppedOnBreakpointGoroutineID(state *api.DebuggerState) (int6
// a channel that will be closed to signal that an
// asynchronous command has completed setup or was interrupted
// due to an error, so the server is ready to receive new requests.
func (s *Session) stepUntilStopAndNotify(command string, threadId int, granularity dap.SteppingGranularity, allowNextStateChange chan struct{}) {
defer closeIfOpen(allowNextStateChange)
func (s *Session) stepUntilStopAndNotify(command string, threadId int, granularity dap.SteppingGranularity, allowNextStateChange *syncflag) {
defer allowNextStateChange.raise()
_, err := s.debugger.Command(&api.DebuggerCommand{Name: api.SwitchGoroutine, GoroutineID: int64(threadId)}, nil)
if err != nil {
s.config.log.Errorf("Error switching goroutines while stepping: %v", err)
@ -2949,7 +2949,7 @@ func (s *Session) onRestartRequest(request *dap.RestartRequest) {
// onStepBackRequest handles 'stepBack' request.
// This is an optional request enabled by capability 'supportsStepBackRequest'.
func (s *Session) onStepBackRequest(request *dap.StepBackRequest, allowNextStateChange chan struct{}) {
func (s *Session) onStepBackRequest(request *dap.StepBackRequest, allowNextStateChange *syncflag) {
s.sendStepResponse(request.Arguments.ThreadId, &dap.StepBackResponse{Response: *newResponse(request.Request)})
s.stepUntilStopAndNotify(api.ReverseNext, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange)
}
@ -2957,7 +2957,7 @@ func (s *Session) onStepBackRequest(request *dap.StepBackRequest, allowNextState
// onReverseContinueRequest performs a rewind command call up to the previous
// breakpoint or the start of the process
// This is an optional request enabled by capability 'supportsStepBackRequest'.
func (s *Session) onReverseContinueRequest(request *dap.ReverseContinueRequest, allowNextStateChange chan struct{}) {
func (s *Session) onReverseContinueRequest(request *dap.ReverseContinueRequest, allowNextStateChange *syncflag) {
s.send(&dap.ReverseContinueResponse{
Response: *newResponse(request.Request),
})
@ -3572,7 +3572,7 @@ func (s *Session) checkHaltRequested() bool {
// resumeOnce is a helper function to resume the execution
// of the target when the program is halted.
func (s *Session) resumeOnce(command string, allowNextStateChange chan struct{}) (bool, *api.DebuggerState, error) {
func (s *Session) resumeOnce(command string, allowNextStateChange *syncflag) (bool, *api.DebuggerState, error) {
// No other goroutines should be able to try to resume
// or halt execution while this goroutine is resuming
// execution, so we do not miss those events.
@ -3581,7 +3581,7 @@ func (s *Session) resumeOnce(command string, allowNextStateChange chan struct{})
s.changeStateMu.Lock()
go func() {
defer s.changeStateMu.Unlock()
defer closeIfOpen(allowNextStateChange)
defer allowNextStateChange.raise()
<-asyncSetupDone
}()
@ -3602,7 +3602,7 @@ func (s *Session) resumeOnce(command string, allowNextStateChange chan struct{})
// a channel that will be closed to signal that an
// asynchronous command has completed setup or was interrupted
// due to an error, so the server is ready to receive new requests.
func (s *Session) runUntilStopAndNotify(command string, allowNextStateChange chan struct{}) {
func (s *Session) runUntilStopAndNotify(command string, allowNextStateChange *syncflag) {
state, err := s.runUntilStop(command, allowNextStateChange)
if s.conn.isClosed() {
@ -3714,7 +3714,7 @@ func (s *Session) runUntilStopAndNotify(command string, allowNextStateChange cha
}
}
func (s *Session) runUntilStop(command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) {
func (s *Session) runUntilStop(command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) {
// Clear any manual stop requests that came in before we started running.
s.setHaltRequested(false)
@ -3731,11 +3731,11 @@ func (s *Session) runUntilStop(command string, allowNextStateChange chan struct{
}
// Make this a var, so it can be stubbed in testing.
var resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) {
var resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) {
return s.resumeOnceAndCheckStop(command, allowNextStateChange)
}
func (s *Session) resumeOnceAndCheckStop(command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) {
func (s *Session) resumeOnceAndCheckStop(command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) {
resumed, state, err := s.resumeOnce(command, allowNextStateChange)
// We should not try to process the log points if the program was not
// resumed or there was an error.
@ -3920,3 +3920,32 @@ func parseLogPoint(msg string) (bool, *logMessage, error) {
args: args,
}, nil
}
type syncflag struct {
mu sync.Mutex
cond *sync.Cond
flag bool
}
func newSyncflag() *syncflag {
r := &syncflag{
flag: false,
}
r.cond = sync.NewCond(&r.mu)
return r
}
func (s *syncflag) wait() {
s.mu.Lock()
for !s.flag {
s.cond.Wait()
}
s.mu.Unlock()
}
func (s *syncflag) raise() {
s.mu.Lock()
s.flag = true
s.mu.Unlock()
s.cond.Broadcast()
}

@ -3464,7 +3464,7 @@ func TestHaltPreventsAutoResume(t *testing.T) {
// Send a halt request when trying to resume the program after being
// interrupted. This should allow the log message to be processed,
// but keep the process from continuing beyond the line.
resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) {
resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) {
// This should trigger after the log message is sent, but before
// execution is resumed.
if command == api.DirectionCongruentContinue {