Fix "Stopping..." scheduler hang (#10487)
* Adjust initial scheduler refCount Ensure we only set the refCount on success * sched: fix lock order inversion deadlock Under certain race conditions, there was a scenario where the scheduler would get into a deadlock while trying to update free space information while a model was trying to unload.
This commit is contained in:
parent
718eda1b3e
commit
415c8fcc3d
@ -441,10 +441,9 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
|
|||||||
estimatedVRAM: llama.EstimatedVRAM(),
|
estimatedVRAM: llama.EstimatedVRAM(),
|
||||||
estimatedTotal: llama.EstimatedTotal(),
|
estimatedTotal: llama.EstimatedTotal(),
|
||||||
loading: true,
|
loading: true,
|
||||||
refCount: 1,
|
|
||||||
}
|
}
|
||||||
runner.numParallel = numParallel
|
runner.numParallel = numParallel
|
||||||
runner.refMu.Lock()
|
runner.refMu.Lock() // hold lock until running or aborted
|
||||||
|
|
||||||
s.loadedMu.Lock()
|
s.loadedMu.Lock()
|
||||||
s.loaded[req.model.ModelPath] = runner
|
s.loaded[req.model.ModelPath] = runner
|
||||||
@ -455,13 +454,13 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
|
|||||||
defer runner.refMu.Unlock()
|
defer runner.refMu.Unlock()
|
||||||
if err = llama.WaitUntilRunning(req.ctx); err != nil {
|
if err = llama.WaitUntilRunning(req.ctx); err != nil {
|
||||||
slog.Error("error loading llama server", "error", err)
|
slog.Error("error loading llama server", "error", err)
|
||||||
runner.refCount--
|
|
||||||
req.errCh <- err
|
req.errCh <- err
|
||||||
slog.Debug("triggering expiration for failed load", "model", runner.modelPath)
|
slog.Debug("triggering expiration for failed load", "model", runner.modelPath)
|
||||||
s.expiredCh <- runner
|
s.expiredCh <- runner
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
slog.Debug("finished setting up runner", "model", req.model.ModelPath)
|
slog.Debug("finished setting up runner", "model", req.model.ModelPath)
|
||||||
|
runner.refCount++
|
||||||
runner.loading = false
|
runner.loading = false
|
||||||
go func() {
|
go func() {
|
||||||
<-req.ctx.Done()
|
<-req.ctx.Done()
|
||||||
@ -479,7 +478,12 @@ func (s *Scheduler) updateFreeSpace(allGpus discover.GpuInfoList) {
|
|||||||
}
|
}
|
||||||
predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
|
predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
|
||||||
s.loadedMu.Lock()
|
s.loadedMu.Lock()
|
||||||
|
runners := make([]*runnerRef, 0, len(s.loaded))
|
||||||
for _, r := range s.loaded {
|
for _, r := range s.loaded {
|
||||||
|
runners = append(runners, r)
|
||||||
|
}
|
||||||
|
s.loadedMu.Unlock()
|
||||||
|
for _, r := range runners {
|
||||||
r.refMu.Lock()
|
r.refMu.Lock()
|
||||||
if r.llama != nil {
|
if r.llama != nil {
|
||||||
for _, gpu := range allGpus {
|
for _, gpu := range allGpus {
|
||||||
@ -490,7 +494,6 @@ func (s *Scheduler) updateFreeSpace(allGpus discover.GpuInfoList) {
|
|||||||
}
|
}
|
||||||
r.refMu.Unlock()
|
r.refMu.Unlock()
|
||||||
}
|
}
|
||||||
s.loadedMu.Unlock()
|
|
||||||
|
|
||||||
// Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
|
// Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
|
||||||
for i := range allGpus {
|
for i := range allGpus {
|
||||||
@ -537,10 +540,8 @@ func (s *Scheduler) filterGPUsWithoutLoadingModels(allGpus discover.GpuInfoList)
|
|||||||
|
|
||||||
// TODO consolidate sched_types.go
|
// TODO consolidate sched_types.go
|
||||||
type runnerRef struct {
|
type runnerRef struct {
|
||||||
refMu sync.Mutex
|
refMu sync.Mutex
|
||||||
// refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
|
|
||||||
refCount uint // prevent unloading if > 0
|
refCount uint // prevent unloading if > 0
|
||||||
// unloading bool // set to true when we are trying to unload the runner
|
|
||||||
|
|
||||||
llama llm.LlamaServer
|
llama llm.LlamaServer
|
||||||
loading bool // True only during initial load, then false forever
|
loading bool // True only during initial load, then false forever
|
||||||
@ -811,8 +812,8 @@ func (s *Scheduler) unloadAllRunners() {
|
|||||||
|
|
||||||
func (s *Scheduler) expireRunner(model *Model) {
|
func (s *Scheduler) expireRunner(model *Model) {
|
||||||
s.loadedMu.Lock()
|
s.loadedMu.Lock()
|
||||||
defer s.loadedMu.Unlock()
|
|
||||||
runner, ok := s.loaded[model.ModelPath]
|
runner, ok := s.loaded[model.ModelPath]
|
||||||
|
s.loadedMu.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
runner.refMu.Lock()
|
runner.refMu.Lock()
|
||||||
runner.expiresAt = time.Now()
|
runner.expiresAt = time.Now()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user