From b2a465296d7131ca440fd81c1bee888f4103a585 Mon Sep 17 00:00:00 2001 From: Jesse Gross Date: Fri, 14 Mar 2025 17:24:46 -0700 Subject: [PATCH] runner: Release semaphore and improve error messages on failures If we have an error after creating a new sequence but before finding a slot for it, we return without releasing the semaphore. This reduces our parallel sequences and eventually leads to deadlock. In practice this should never happen because once we have acquired the semaphore, we should always be able to find a slot. However, the code is clearly not correct. --- runner/llamarunner/runner.go | 8 ++++++-- runner/ollamarunner/runner.go | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/runner/llamarunner/runner.go b/runner/llamarunner/runner.go index 83802d604..ee5d47f6e 100644 --- a/runner/llamarunner/runner.go +++ b/runner/llamarunner/runner.go @@ -599,7 +599,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { if errors.Is(err, context.Canceled) { slog.Info("aborting completion request due to client closing the connection") } else { - slog.Error("Failed to acquire semaphore", "error", err) + http.Error(w, fmt.Sprintf("Failed to acquire semaphore: %v", err), http.StatusInternalServerError) } return } @@ -611,6 +611,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, true) if err != nil { s.mu.Unlock() + s.seqsSem.Release(1) http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError) return } @@ -626,6 +627,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { s.mu.Unlock() if !found { + s.seqsSem.Release(1) http.Error(w, "could not find an available sequence", http.StatusInternalServerError) return } @@ -691,7 +693,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) { if errors.Is(err, context.Canceled) { slog.Info("aborting embeddings request due to client closing the connection") } else { - slog.Error("Failed to acquire semaphore", "error", err) + http.Error(w, fmt.Sprintf("Failed to acquire semaphore: %v", err), http.StatusInternalServerError) } return } @@ -703,6 +705,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) { seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, false) if err != nil { s.mu.Unlock() + s.seqsSem.Release(1) http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError) return } @@ -715,6 +718,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) { s.mu.Unlock() if !found { + s.seqsSem.Release(1) http.Error(w, "could not find an available sequence", http.StatusInternalServerError) return } diff --git a/runner/ollamarunner/runner.go b/runner/ollamarunner/runner.go index 6d20fa85b..bc7a07ed6 100644 --- a/runner/ollamarunner/runner.go +++ b/runner/ollamarunner/runner.go @@ -609,7 +609,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { if errors.Is(err, context.Canceled) { slog.Info("aborting completion request due to client closing the connection") } else { - slog.Error("Failed to acquire semaphore", "error", err) + http.Error(w, fmt.Sprintf("Failed to acquire semaphore: %v", err), http.StatusInternalServerError) } return } @@ -621,6 +621,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs) if err != nil { s.mu.Unlock() + s.seqsSem.Release(1) http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError) return } @@ -634,6 +635,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { s.mu.Unlock() if !found { + s.seqsSem.Release(1) http.Error(w, "could not find an available sequence", http.StatusInternalServerError) return }