From 5e380c3b42741541d01cea0c821f4e01aa4e432e Mon Sep 17 00:00:00 2001 From: Daniel Hiltgen Date: Wed, 7 May 2025 09:38:17 -0700 Subject: [PATCH] sched: fix race leading to orphaned runners (#10599) If a model is loading, and the request context is canceled during the load by a client closing the connection, and another request is inbound for the same model with a different configuration (context size, etc.) thus requiring a reload, two unload events can be in flight. The first shuts down the original model load, but the second one caused the loss of the new reloading runner reference, thus triggering the leak. The primary fix is detecting the duplicate unload and ignoring the second instance. The load routine is also hardened to ensure we detect clobbering an already present runner and unload it with a warning. --- llm/server.go | 6 +++--- server/sched.go | 54 +++++++++++++++++++++++++++++++++---------------- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/llm/server.go b/llm/server.go index f2f04c18a..8884d105a 100644 --- a/llm/server.go +++ b/llm/server.go @@ -1010,17 +1010,17 @@ func (s *llmServer) Close() error { s.llamaModelLock.Unlock() if s.cmd != nil { - slog.Debug("stopping llama server") + slog.Debug("stopping llama server", "pid", s.Pid()) if err := s.cmd.Process.Kill(); err != nil { return err } // if ProcessState is already populated, Wait already completed, no need to wait again if s.cmd.ProcessState == nil { - slog.Debug("waiting for llama server to exit") + slog.Debug("waiting for llama server to exit", "pid", s.Pid()) <-s.done } - slog.Debug("llama server stopped") + slog.Debug("llama server stopped", "pid", s.Pid()) } return nil diff --git a/server/sched.go b/server/sched.go index 7586c83ff..43da138e2 100644 --- a/server/sched.go +++ b/server/sched.go @@ -296,13 +296,13 @@ func (s *Scheduler) processPending(ctx context.Context) { // Wait for the unload to happen // Note: at this point we're queueing up all incoming requests, even if they were for // a different model that's loaded and not scheduled to be removed. - slog.Debug("waiting for pending requests to complete and unload to occur", "modelPath", runnerToExpire.modelPath) + slog.Debug("waiting for pending requests to complete and unload to occur", "runner", runnerToExpire) select { case <-ctx.Done(): slog.Debug("shutting down scheduler pending loop") return case <-s.unloadedCh: - slog.Debug("unload completed", "modelPath", runnerToExpire.modelPath) + slog.Debug("unload completed", "runner", runnerToExpire) continue } } @@ -375,17 +375,29 @@ func (s *Scheduler) processCompleted(ctx context.Context) { } s.loadedMu.Lock() - slog.Debug("got lock to unload", "runner", runner) - finished := runner.waitForVRAMRecovery() - runner.unload() - delete(s.loaded, runner.modelPath) - s.loadedMu.Unlock() - slog.Debug("runner released", "runner", runner) - runner.refMu.Unlock() - - <-finished - slog.Debug("sending an unloaded event", "runner", runner) - s.unloadedCh <- struct{}{} + slog.Debug("got lock to unload expired event", "runner", runner) + runnerToUnload := s.loaded[runner.modelPath] + if runnerToUnload == nil { + // If runnerToUnload is nil, we already processed an event and + // unloaded it. This double unload can happen if the initial + // request is canceled and we're trying to load another model + // that requires this one to be evicted, or the settings change + // and require a reload + s.loadedMu.Unlock() + runner.refMu.Unlock() + slog.Debug("duplicate expired event, ignoring", "runner", runner) + } else { + slog.Debug("starting background wait for VRAM recovery", "runner", runner) + finished := runner.waitForVRAMRecovery() + runner.unload() + delete(s.loaded, runner.modelPath) + s.loadedMu.Unlock() + slog.Debug("runner terminated and removed from list, blocking for VRAM recovery", "runner", runner) + <-finished + runner.refMu.Unlock() + slog.Debug("sending an unloaded event", "runner", runner) + s.unloadedCh <- struct{}{} + } } } } @@ -448,6 +460,13 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis runner.refMu.Lock() // hold lock until running or aborted s.loadedMu.Lock() + if oldRunner, ok := s.loaded[req.model.ModelPath]; ok { + // Shouldn't happen, but safeguard against leaking a runner + slog.Warn("model was still loaded", "old_runner", oldRunner, "new_runner", runner) + oldRunner.refMu.Lock() + oldRunner.unload() + oldRunner.refMu.Unlock() + } s.loaded[req.model.ModelPath] = runner slog.Info("loaded runners", "count", len(s.loaded)) s.loadedMu.Unlock() @@ -457,11 +476,11 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis if err = llama.WaitUntilRunning(req.ctx); err != nil { slog.Error("error loading llama server", "error", err) req.errCh <- err - slog.Debug("triggering expiration for failed load", "model", runner.modelPath) + slog.Debug("triggering expiration for failed load", "runner", runner) s.expiredCh <- runner return } - slog.Debug("finished setting up runner", "model", req.model.ModelPath) + slog.Debug("finished setting up", "runner", runner) if runner.pid < 0 { runner.pid = llama.Pid() } @@ -634,6 +653,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { (len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) || (runtime.GOOS == "windows" && runner.gpus[0].Library != "cuda") { finished <- struct{}{} + slog.Debug("no need to wait for VRAM recovery", "runner", runner) return finished } start := time.Now() @@ -652,7 +672,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { for { <-ticker.C if time.Now().After(expiresAt) { - slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "model", runner.modelPath) + slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "runner", runner) finished <- struct{}{} } @@ -665,7 +685,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { } // If we're within ~80% of the estimated memory usage recovered, bail out if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 { - slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "model", runner.modelPath) + slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "runner", runner) finished <- struct{}{} return }