diff --git a/llm/server.go b/llm/server.go index f2f04c18a..8884d105a 100644 --- a/llm/server.go +++ b/llm/server.go @@ -1010,17 +1010,17 @@ func (s *llmServer) Close() error { s.llamaModelLock.Unlock() if s.cmd != nil { - slog.Debug("stopping llama server") + slog.Debug("stopping llama server", "pid", s.Pid()) if err := s.cmd.Process.Kill(); err != nil { return err } // if ProcessState is already populated, Wait already completed, no need to wait again if s.cmd.ProcessState == nil { - slog.Debug("waiting for llama server to exit") + slog.Debug("waiting for llama server to exit", "pid", s.Pid()) <-s.done } - slog.Debug("llama server stopped") + slog.Debug("llama server stopped", "pid", s.Pid()) } return nil diff --git a/server/sched.go b/server/sched.go index 7586c83ff..43da138e2 100644 --- a/server/sched.go +++ b/server/sched.go @@ -296,13 +296,13 @@ func (s *Scheduler) processPending(ctx context.Context) { // Wait for the unload to happen // Note: at this point we're queueing up all incoming requests, even if they were for // a different model that's loaded and not scheduled to be removed. - slog.Debug("waiting for pending requests to complete and unload to occur", "modelPath", runnerToExpire.modelPath) + slog.Debug("waiting for pending requests to complete and unload to occur", "runner", runnerToExpire) select { case <-ctx.Done(): slog.Debug("shutting down scheduler pending loop") return case <-s.unloadedCh: - slog.Debug("unload completed", "modelPath", runnerToExpire.modelPath) + slog.Debug("unload completed", "runner", runnerToExpire) continue } } @@ -375,17 +375,29 @@ func (s *Scheduler) processCompleted(ctx context.Context) { } s.loadedMu.Lock() - slog.Debug("got lock to unload", "runner", runner) - finished := runner.waitForVRAMRecovery() - runner.unload() - delete(s.loaded, runner.modelPath) - s.loadedMu.Unlock() - slog.Debug("runner released", "runner", runner) - runner.refMu.Unlock() - - <-finished - slog.Debug("sending an unloaded event", "runner", runner) - s.unloadedCh <- struct{}{} + slog.Debug("got lock to unload expired event", "runner", runner) + runnerToUnload := s.loaded[runner.modelPath] + if runnerToUnload == nil { + // If runnerToUnload is nil, we already processed an event and + // unloaded it. This double unload can happen if the initial + // request is canceled and we're trying to load another model + // that requires this one to be evicted, or the settings change + // and require a reload + s.loadedMu.Unlock() + runner.refMu.Unlock() + slog.Debug("duplicate expired event, ignoring", "runner", runner) + } else { + slog.Debug("starting background wait for VRAM recovery", "runner", runner) + finished := runner.waitForVRAMRecovery() + runner.unload() + delete(s.loaded, runner.modelPath) + s.loadedMu.Unlock() + slog.Debug("runner terminated and removed from list, blocking for VRAM recovery", "runner", runner) + <-finished + runner.refMu.Unlock() + slog.Debug("sending an unloaded event", "runner", runner) + s.unloadedCh <- struct{}{} + } } } } @@ -448,6 +460,13 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis runner.refMu.Lock() // hold lock until running or aborted s.loadedMu.Lock() + if oldRunner, ok := s.loaded[req.model.ModelPath]; ok { + // Shouldn't happen, but safeguard against leaking a runner + slog.Warn("model was still loaded", "old_runner", oldRunner, "new_runner", runner) + oldRunner.refMu.Lock() + oldRunner.unload() + oldRunner.refMu.Unlock() + } s.loaded[req.model.ModelPath] = runner slog.Info("loaded runners", "count", len(s.loaded)) s.loadedMu.Unlock() @@ -457,11 +476,11 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis if err = llama.WaitUntilRunning(req.ctx); err != nil { slog.Error("error loading llama server", "error", err) req.errCh <- err - slog.Debug("triggering expiration for failed load", "model", runner.modelPath) + slog.Debug("triggering expiration for failed load", "runner", runner) s.expiredCh <- runner return } - slog.Debug("finished setting up runner", "model", req.model.ModelPath) + slog.Debug("finished setting up", "runner", runner) if runner.pid < 0 { runner.pid = llama.Pid() } @@ -634,6 +653,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { (len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) || (runtime.GOOS == "windows" && runner.gpus[0].Library != "cuda") { finished <- struct{}{} + slog.Debug("no need to wait for VRAM recovery", "runner", runner) return finished } start := time.Now() @@ -652,7 +672,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { for { <-ticker.C if time.Now().After(expiresAt) { - slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "model", runner.modelPath) + slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "runner", runner) finished <- struct{}{} } @@ -665,7 +685,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { } // If we're within ~80% of the estimated memory usage recovered, bail out if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 { - slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "model", runner.modelPath) + slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "runner", runner) finished <- struct{}{} return }