From f558ca4f32c7e7f597f1dbaad417a169b173e3c4 Mon Sep 17 00:00:00 2001 From: Alessandro Arzilli Date: Mon, 4 Dec 2023 15:40:59 +0100 Subject: [PATCH] service/dap: fix close on closed channel panic (#3573) Fixes close on closed channel panic that happens sporadically on many of the dap tests (for example 1 ~ 3% of the times on TestStepInstruction). --- service/dap/server.go | 77 ++++++++++++++++++++++++++------------ service/dap/server_test.go | 2 +- 2 files changed, 54 insertions(+), 25 deletions(-) diff --git a/service/dap/server.go b/service/dap/server.go index 96e45c8d..0e320edd 100644 --- a/service/dap/server.go +++ b/service/dap/server.go @@ -686,7 +686,7 @@ func (s *Session) handleRequest(request dap.Message) { // Non-blocking request handlers will signal when they are ready // setting up for async execution, so more requests can be processed. - resumeRequestLoop := make(chan struct{}) + resumeRequestLoop := newSyncflag() switch request := request.(type) { //--- Asynchronous requests --- @@ -695,43 +695,43 @@ func (s *Session) handleRequest(request dap.Message) { defer s.recoverPanic(request) s.onConfigurationDoneRequest(request, resumeRequestLoop) }() - <-resumeRequestLoop + resumeRequestLoop.wait() case *dap.ContinueRequest: // Required go func() { defer s.recoverPanic(request) s.onContinueRequest(request, resumeRequestLoop) }() - <-resumeRequestLoop + resumeRequestLoop.wait() case *dap.NextRequest: // Required go func() { defer s.recoverPanic(request) s.onNextRequest(request, resumeRequestLoop) }() - <-resumeRequestLoop + resumeRequestLoop.wait() case *dap.StepInRequest: // Required go func() { defer s.recoverPanic(request) s.onStepInRequest(request, resumeRequestLoop) }() - <-resumeRequestLoop + resumeRequestLoop.wait() case *dap.StepOutRequest: // Required go func() { defer s.recoverPanic(request) s.onStepOutRequest(request, resumeRequestLoop) }() - <-resumeRequestLoop + resumeRequestLoop.wait() case *dap.StepBackRequest: // Optional (capability 'supportsStepBack') go func() { defer s.recoverPanic(request) s.onStepBackRequest(request, resumeRequestLoop) }() - <-resumeRequestLoop + resumeRequestLoop.wait() case *dap.ReverseContinueRequest: // Optional (capability 'supportsStepBack') go func() { defer s.recoverPanic(request) s.onReverseContinueRequest(request, resumeRequestLoop) }() - <-resumeRequestLoop + resumeRequestLoop.wait() //--- Synchronous requests --- case *dap.SetBreakpointsRequest: // Required s.onSetBreakpointsRequest(request) @@ -1663,8 +1663,8 @@ func closeIfOpen(ch chan struct{}) { // This is an optional request enabled by capability 'supportsConfigurationDoneRequest'. // It gets triggered after all the debug requests that follow initialized event, // so the s.debugger is guaranteed to be set. Expects the target to be halted. -func (s *Session) onConfigurationDoneRequest(request *dap.ConfigurationDoneRequest, allowNextStateChange chan struct{}) { - defer closeIfOpen(allowNextStateChange) +func (s *Session) onConfigurationDoneRequest(request *dap.ConfigurationDoneRequest, allowNextStateChange *syncflag) { + defer allowNextStateChange.raise() if s.args.stopOnEntry { e := &dap.StoppedEvent{ Event: *newEvent("stopped"), @@ -1684,7 +1684,7 @@ func (s *Session) onConfigurationDoneRequest(request *dap.ConfigurationDoneReque // onContinueRequest handles 'continue' request. // This is a mandatory request to support. -func (s *Session) onContinueRequest(request *dap.ContinueRequest, allowNextStateChange chan struct{}) { +func (s *Session) onContinueRequest(request *dap.ContinueRequest, allowNextStateChange *syncflag) { s.send(&dap.ContinueResponse{ Response: *newResponse(request.Request), Body: dap.ContinueResponseBody{AllThreadsContinued: true}}) @@ -1922,21 +1922,21 @@ func (s *Session) onAttachRequest(request *dap.AttachRequest) { // onNextRequest handles 'next' request. // This is a mandatory request to support. -func (s *Session) onNextRequest(request *dap.NextRequest, allowNextStateChange chan struct{}) { +func (s *Session) onNextRequest(request *dap.NextRequest, allowNextStateChange *syncflag) { s.sendStepResponse(request.Arguments.ThreadId, &dap.NextResponse{Response: *newResponse(request.Request)}) s.stepUntilStopAndNotify(api.Next, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange) } // onStepInRequest handles 'stepIn' request // This is a mandatory request to support. -func (s *Session) onStepInRequest(request *dap.StepInRequest, allowNextStateChange chan struct{}) { +func (s *Session) onStepInRequest(request *dap.StepInRequest, allowNextStateChange *syncflag) { s.sendStepResponse(request.Arguments.ThreadId, &dap.StepInResponse{Response: *newResponse(request.Request)}) s.stepUntilStopAndNotify(api.Step, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange) } // onStepOutRequest handles 'stepOut' request // This is a mandatory request to support. -func (s *Session) onStepOutRequest(request *dap.StepOutRequest, allowNextStateChange chan struct{}) { +func (s *Session) onStepOutRequest(request *dap.StepOutRequest, allowNextStateChange *syncflag) { s.sendStepResponse(request.Arguments.ThreadId, &dap.StepOutResponse{Response: *newResponse(request.Request)}) s.stepUntilStopAndNotify(api.StepOut, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange) } @@ -1993,8 +1993,8 @@ func (s *Session) stoppedOnBreakpointGoroutineID(state *api.DebuggerState) (int6 // a channel that will be closed to signal that an // asynchronous command has completed setup or was interrupted // due to an error, so the server is ready to receive new requests. -func (s *Session) stepUntilStopAndNotify(command string, threadId int, granularity dap.SteppingGranularity, allowNextStateChange chan struct{}) { - defer closeIfOpen(allowNextStateChange) +func (s *Session) stepUntilStopAndNotify(command string, threadId int, granularity dap.SteppingGranularity, allowNextStateChange *syncflag) { + defer allowNextStateChange.raise() _, err := s.debugger.Command(&api.DebuggerCommand{Name: api.SwitchGoroutine, GoroutineID: int64(threadId)}, nil) if err != nil { s.config.log.Errorf("Error switching goroutines while stepping: %v", err) @@ -2949,7 +2949,7 @@ func (s *Session) onRestartRequest(request *dap.RestartRequest) { // onStepBackRequest handles 'stepBack' request. // This is an optional request enabled by capability 'supportsStepBackRequest'. -func (s *Session) onStepBackRequest(request *dap.StepBackRequest, allowNextStateChange chan struct{}) { +func (s *Session) onStepBackRequest(request *dap.StepBackRequest, allowNextStateChange *syncflag) { s.sendStepResponse(request.Arguments.ThreadId, &dap.StepBackResponse{Response: *newResponse(request.Request)}) s.stepUntilStopAndNotify(api.ReverseNext, request.Arguments.ThreadId, request.Arguments.Granularity, allowNextStateChange) } @@ -2957,7 +2957,7 @@ func (s *Session) onStepBackRequest(request *dap.StepBackRequest, allowNextState // onReverseContinueRequest performs a rewind command call up to the previous // breakpoint or the start of the process // This is an optional request enabled by capability 'supportsStepBackRequest'. -func (s *Session) onReverseContinueRequest(request *dap.ReverseContinueRequest, allowNextStateChange chan struct{}) { +func (s *Session) onReverseContinueRequest(request *dap.ReverseContinueRequest, allowNextStateChange *syncflag) { s.send(&dap.ReverseContinueResponse{ Response: *newResponse(request.Request), }) @@ -3572,7 +3572,7 @@ func (s *Session) checkHaltRequested() bool { // resumeOnce is a helper function to resume the execution // of the target when the program is halted. -func (s *Session) resumeOnce(command string, allowNextStateChange chan struct{}) (bool, *api.DebuggerState, error) { +func (s *Session) resumeOnce(command string, allowNextStateChange *syncflag) (bool, *api.DebuggerState, error) { // No other goroutines should be able to try to resume // or halt execution while this goroutine is resuming // execution, so we do not miss those events. @@ -3581,7 +3581,7 @@ func (s *Session) resumeOnce(command string, allowNextStateChange chan struct{}) s.changeStateMu.Lock() go func() { defer s.changeStateMu.Unlock() - defer closeIfOpen(allowNextStateChange) + defer allowNextStateChange.raise() <-asyncSetupDone }() @@ -3602,7 +3602,7 @@ func (s *Session) resumeOnce(command string, allowNextStateChange chan struct{}) // a channel that will be closed to signal that an // asynchronous command has completed setup or was interrupted // due to an error, so the server is ready to receive new requests. -func (s *Session) runUntilStopAndNotify(command string, allowNextStateChange chan struct{}) { +func (s *Session) runUntilStopAndNotify(command string, allowNextStateChange *syncflag) { state, err := s.runUntilStop(command, allowNextStateChange) if s.conn.isClosed() { @@ -3714,7 +3714,7 @@ func (s *Session) runUntilStopAndNotify(command string, allowNextStateChange cha } } -func (s *Session) runUntilStop(command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) { +func (s *Session) runUntilStop(command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) { // Clear any manual stop requests that came in before we started running. s.setHaltRequested(false) @@ -3731,11 +3731,11 @@ func (s *Session) runUntilStop(command string, allowNextStateChange chan struct{ } // Make this a var, so it can be stubbed in testing. -var resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) { +var resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) { return s.resumeOnceAndCheckStop(command, allowNextStateChange) } -func (s *Session) resumeOnceAndCheckStop(command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) { +func (s *Session) resumeOnceAndCheckStop(command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) { resumed, state, err := s.resumeOnce(command, allowNextStateChange) // We should not try to process the log points if the program was not // resumed or there was an error. @@ -3920,3 +3920,32 @@ func parseLogPoint(msg string) (bool, *logMessage, error) { args: args, }, nil } + +type syncflag struct { + mu sync.Mutex + cond *sync.Cond + flag bool +} + +func newSyncflag() *syncflag { + r := &syncflag{ + flag: false, + } + r.cond = sync.NewCond(&r.mu) + return r +} + +func (s *syncflag) wait() { + s.mu.Lock() + for !s.flag { + s.cond.Wait() + } + s.mu.Unlock() +} + +func (s *syncflag) raise() { + s.mu.Lock() + s.flag = true + s.mu.Unlock() + s.cond.Broadcast() +} diff --git a/service/dap/server_test.go b/service/dap/server_test.go index b727bbfd..0a8235e1 100644 --- a/service/dap/server_test.go +++ b/service/dap/server_test.go @@ -3464,7 +3464,7 @@ func TestHaltPreventsAutoResume(t *testing.T) { // Send a halt request when trying to resume the program after being // interrupted. This should allow the log message to be processed, // but keep the process from continuing beyond the line. - resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange chan struct{}) (*api.DebuggerState, error) { + resumeOnceAndCheckStop = func(s *Session, command string, allowNextStateChange *syncflag) (*api.DebuggerState, error) { // This should trigger after the log message is sent, but before // execution is resumed. if command == api.DirectionCongruentContinue {