diff --git a/fs/fs.go b/fs/fs.go new file mode 100644 index 000000000..e8d5756d1 --- /dev/null +++ b/fs/fs.go @@ -0,0 +1,88 @@ +package fs + +import ( + "fmt" + "io" + "log/slog" + "os" + + "github.com/ollama/ollama/fs/ggml" +) + +type DType int + +type Model struct { + KV Config + Tensors map[string]TensorReader +} + +func (m Model) LogValue() slog.Value { + return slog.GroupValue( + slog.String("architecture", m.KV.Architecture()), + ) +} + +type Tensor interface { + Name() string + Shape() []int + DType() DType + Size() int +} + +type TensorReader interface { + Tensor + io.Reader +} + +type shimTensorReader struct { + internal *ggml.Tensor + *io.SectionReader +} + +func (t *shimTensorReader) Name() string { + return t.internal.Name +} + +func (t *shimTensorReader) Shape() []int { + shape := make([]int, len(t.internal.Shape)) + for i, s := range t.internal.Shape { + shape[i] = int(s) + } + + return shape +} + +func (t *shimTensorReader) Size() int { + return int(t.internal.Size()) +} + +func (t *shimTensorReader) DType() DType { + return DType(t.internal.Kind) +} + +func ReadFrom(f *os.File) (*Model, error) { + bts, err := io.ReadAll(io.NewSectionReader(f, 0, 4)) + if err != nil { + return nil, err + } + + switch ggml.DetectContentType(bts[:4]) { + case "gguf": + c, _, err := ggml.Decode(f, -1) + if err != nil { + return nil, err + } + + tensors := make(map[string]TensorReader, len(c.Tensors().Items())) + for _, t := range c.Tensors().Items() { + tensors[t.Name] = &shimTensorReader{ + internal: t, + SectionReader: io.NewSectionReader(f, int64(c.Tensors().Offset+t.Offset), int64(t.Size())), + } + } + + return &Model{KV: c.KV(), Tensors: tensors}, nil + default: + return nil, fmt.Errorf("unsupported file type") + } +} diff --git a/ml/backend/ggml/ggml.go b/ml/backend/ggml/ggml.go index 17f063840..625fb4a72 100644 --- a/ml/backend/ggml/ggml.go +++ b/ml/backend/ggml/ggml.go @@ -9,7 +9,9 @@ package ggml import "C" import ( + "bytes" "context" + "errors" "fmt" "io" "log/slog" @@ -19,6 +21,7 @@ import ( "slices" "strconv" "strings" + "sync" "sync/atomic" "unicode" "unsafe" @@ -299,6 +302,11 @@ func New(ctx context.Context, r *os.File, params ml.BackendParams) (ml.Backend, var doneBytes atomic.Uint64 totalBytes := uint64(n) - meta.Tensors().Offset + pool := sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, + } g, ctx := errgroup.WithContext(ctx) g.SetLimit(runtime.GOMAXPROCS(0)) @@ -320,19 +328,32 @@ func New(ctx context.Context, r *os.File, params ml.BackendParams) (ml.Backend, } sr := io.NewSectionReader(r, int64(meta.Tensors().Offset+t.Offset), int64(t.Size())) - bts := make([]byte, 128*format.KibiByte) + // bts := make([]byte, 128*format.KibiByte) var s uint64 for s < t.Size() { - n, err := io.ReadFull(sr, bts[:min(len(bts), int(t.Size()-s))]) - if err != nil { + b := pool.Get().(*bytes.Buffer) + b.Reset() + + // n, err := io.ReadFull(sr, bts[:min(len(bts), int(t.Size()-s))]) + // if err != nil { + // return err + // } + n, err := io.CopyN(b, sr, 256*format.KibiByte) + if n > 0 { + } else if errors.Is(err, io.EOF) { + break + } else if err != nil { return err } + bts := b.Bytes() for _, tt := range tts { C.ggml_backend_tensor_set(tt, unsafe.Pointer(&bts[0]), C.size_t(s), C.size_t(n)) } + pool.Put(b) + s += uint64(n) if params.Progress != nil { diff --git a/ml/backend/ggml/ggml2.go b/ml/backend/ggml/ggml2.go new file mode 100644 index 000000000..fbf963c36 --- /dev/null +++ b/ml/backend/ggml/ggml2.go @@ -0,0 +1,273 @@ +package ggml + +// #cgo CPPFLAGS: -I${SRCDIR}/ggml/include +// #include +// #include +// #include "ggml.h" +// #include "ggml-cpu.h" +// #include "ggml-backend.h" +import "C" + +import ( + "bytes" + "context" + "errors" + "io" + "log/slog" + "runtime" + "sync" + "unsafe" + + "github.com/ollama/ollama/format" + "github.com/ollama/ollama/fs" + "github.com/ollama/ollama/ml" + ggml "github.com/ollama/ollama/ml/backend/ggml/ggml/src" + "golang.org/x/sync/errgroup" +) + +type backend struct { + gpus, cpus []*C.struct_ggml_backend_device + bufts map[*C.struct_ggml_backend_device][]*C.struct_ggml_backend_buffer_type + ctxs map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_context + bbs map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_backend_buffer + readers map[*C.struct_ggml_tensor]io.Reader + reserved map[*C.struct_ggml_context]uint64 + + onceScheduler sync.Once + scheduler *scheduler +} + +var _ ml.Backend2 = (*backend)(nil) + +func New2() (ml.Backend2, error) { + ggml.OnceLoad() + + var cpus, accels, gpus []*C.struct_ggml_backend_device + for i := range C.ggml_backend_dev_count() { + d := C.ggml_backend_dev_get(C.size_t(i)) + switch C.ggml_backend_dev_type(d) { + case C.GGML_BACKEND_DEVICE_TYPE_CPU: + // only the first cpu device should be used + if len(cpus) > 0 { + continue + } + + cpus = append(cpus, d) + case C.GGML_BACKEND_DEVICE_TYPE_ACCEL: + accels = append(accels, d) + case C.GGML_BACKEND_DEVICE_TYPE_GPU: + gpus = append(gpus, d) + } + } + + bufts := make(map[*C.struct_ggml_backend_device][]*C.struct_ggml_backend_buffer_type) + + cpu := C.ggml_backend_dev_by_type(C.GGML_BACKEND_DEVICE_TYPE_CPU) + for _, d := range append(accels, cpus...) { + bufts[cpu] = append(bufts[cpu], C.ggml_backend_dev_buffer_type(d)) + } + + for _, d := range gpus { + bufts[d] = append(bufts[d], append([]*C.struct_ggml_backend_buffer_type{C.ggml_backend_dev_buffer_type(d)}, bufts[cpu]...)...) + } + + return &backend{ + // merge accels and cpus + gpus: gpus, + cpus: append(accels, cpus...), + bufts: bufts, + ctxs: make(map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_context, len(bufts)), + bbs: make(map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_backend_buffer, len(bufts)), + readers: make(map[*C.struct_ggml_tensor]io.Reader), + reserved: make(map[*C.struct_ggml_context]uint64), + }, nil +} + +func (b *backend) Close() { +} + +func (b *backend) NewContext() ml.Context { + return &Context{ + b: &Backend{ + input: b.bufts[b.cpus[0]][0], + output: b.bufts[b.cpus[0]][0], + layers: func() map[int]*C.struct_ggml_backend_buffer_type { + m := make(map[int]*C.struct_ggml_backend_buffer_type) + for i := range 100 { + m[i] = b.bufts[b.gpus[0]][0] + } + return m + }(), + sched: func() *C.struct_ggml_backend_sched { + return b.Scheduler().(*scheduler).s + }(), + maxGraphNodes: 8192, + }, + ctx: C.ggml_init(C.struct_ggml_init_params{ + mem_size: C.ggml_tensor_overhead() * C.size_t(4000), + no_alloc: true, + }), + buft: b.bufts[b.cpus[0]][0], + maxGraphNodes: 8192, + } +} + +func (b *backend) Get(tensorReader fs.TensorReader, preferredDevice ml.Device) ml.Tensor { + var ctx *C.struct_ggml_context + + var devices []*C.struct_ggml_backend_device + if preferredDevice == ml.GPU { + devices = b.gpus + } + + for _, d := range append(devices, b.cpus...) { + var free, total C.size_t + C.ggml_backend_dev_memory(d, &free, &total) + + for _, buft := range b.bufts[d] { + if _, ok := b.ctxs[buft]; !ok { + b.ctxs[buft] = C.ggml_init(C.struct_ggml_init_params{ + mem_size: C.ggml_tensor_overhead() * C.size_t(1000), + no_alloc: true, + }) + } + + ctx = b.ctxs[buft] + if free > 0 && b.reserved[ctx]+uint64(tensorReader.Size()) >= uint64(free) { + slog.Info("no space available", "device", C.GoString(C.ggml_backend_dev_name(d)), "free", format.HumanBytes2(uint64(free)), "total", format.HumanBytes2(uint64(total)), "reserve", format.HumanBytes2(b.reserved[ctx]), "size", format.HumanBytes2(uint64(tensorReader.Size()))) + continue + } + + cname := C.CString(tensorReader.Name()) + defer C.free(unsafe.Pointer(cname)) + + if t := C.ggml_get_tensor(ctx, cname); t != nil { + slog.Info("using existing tensor in buffer type", "name", tensorReader.Name(), "buffer_type", C.GoString(C.ggml_backend_buft_name(buft))) + return &Tensor{t: t} + } + + shape := make([]C.int64_t, len(tensorReader.Shape())) + for i, s := range tensorReader.Shape() { + shape[i] = C.int64_t(s) + } + + t := C.ggml_new_tensor(ctx, uint32(tensorReader.DType()), C.int(len(tensorReader.Shape())), unsafe.SliceData(shape)) + C.ggml_set_name(t, cname) + + b.readers[t] = tensorReader + b.reserved[ctx] += uint64(tensorReader.Size()) + + slog.Info("creating new tensor in buffer type", "name", tensorReader.Name(), "buffer_type", C.GoString(C.ggml_backend_buft_name(buft)), "reserve", format.HumanBytes2(b.reserved[ctx])) + return &Tensor{t: t} + } + } + + panic("no device available") +} + +func (b *backend) LoadAll(ctx context.Context) error { + // allocate buffers for each context + for buft, ctx := range b.ctxs { + if C.ggml_get_first_tensor(ctx) == nil { + continue + } + + bb := C.ggml_backend_alloc_ctx_tensors_from_buft(ctx, buft) + C.ggml_backend_buffer_set_usage(bb, C.GGML_BACKEND_BUFFER_USAGE_WEIGHTS) + b.bbs[buft] = bb + } + + for _, bb := range b.bbs { + slog.Info("", "buffer.size", C.ggml_backend_buffer_get_size(bb), "buffer.usage", C.ggml_backend_buffer_get_usage(bb)) + } + + pool := sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, + } + + g, ctx := errgroup.WithContext(context.Background()) + g.SetLimit(runtime.GOMAXPROCS(0)) + for t, r := range b.readers { + g.Go(func() error { + var s uint64 + + for { + b := pool.Get().(*bytes.Buffer) + b.Reset() + + n, err := io.CopyN(b, r, 32*format.KibiByte) + if n > 0 { + } else if errors.Is(err, io.EOF) { + break + } else if err != nil { + return err + } + + C.ggml_backend_tensor_set(t, unsafe.Pointer(&b.Bytes()[0]), C.size_t(s), C.size_t(n)) + pool.Put(b) + } + + return nil + }) + } + + go func() { + <-ctx.Done() + g.Go(func() error { + return ctx.Err() + }) + }() + + return g.Wait() +} + +type scheduler struct { + s *C.struct_ggml_backend_sched +} + +var ( + _ ml.Scheduler = (*scheduler)(nil) + _ ml.Reserver = (*scheduler)(nil) +) + +func (b *backend) Scheduler() ml.Scheduler { + b.onceScheduler.Do(func() { + devices := append(b.gpus, b.cpus...) + backends := make([]C.ggml_backend_t, len(devices)) + bufts := make([]C.ggml_backend_buffer_type_t, len(devices)) + for i, device := range devices { + backend := C.ggml_backend_dev_init(device, nil) + buft := C.ggml_backend_get_default_buffer_type(backend) + if d := C.ggml_backend_get_device(backend); C.ggml_backend_dev_type(d) == C.GGML_BACKEND_DEVICE_TYPE_CPU && len(b.gpus) > 0 { + if hbt := C.ggml_backend_dev_host_buffer_type(b.gpus[0]); hbt != nil { + buft = hbt + } + } + + slog.Info("scheduler", "backend", C.GoString(C.ggml_backend_name(backend)), "buffer_type", C.GoString(C.ggml_backend_buft_name(buft))) + backends[i] = backend + bufts[i] = buft + } + + maxGraphNodes := max(8192, 1) + b.scheduler = &scheduler{ + s: C.ggml_backend_sched_new( + unsafe.SliceData(backends), + unsafe.SliceData(bufts), + C.int(len(backends)), + C.size_t(maxGraphNodes), + C._Bool(len(b.gpus) > 1), + ), + } + }) + + return b.scheduler +} + +func (s scheduler) Schedule() { +} + +func (s scheduler) Reserve() { +} diff --git a/ml/backend2.go b/ml/backend2.go new file mode 100644 index 000000000..0bca53d7f --- /dev/null +++ b/ml/backend2.go @@ -0,0 +1,25 @@ +package ml + +import ( + "context" + + "github.com/ollama/ollama/fs" +) + +type Device int + +const ( + CPU Device = iota + GPU +) + +type Backend2 interface { + Close() + + NewContext() Context + + Scheduler() Scheduler + + Get(fs.TensorReader, Device) Tensor + LoadAll(context.Context) error +} diff --git a/ml/scheduler.go b/ml/scheduler.go new file mode 100644 index 000000000..11d51c33c --- /dev/null +++ b/ml/scheduler.go @@ -0,0 +1,11 @@ +package ml + +// Scheduler is an interface that can be implemented by a Backend to schedule resources. +type Scheduler interface { + Schedule() +} + +// Reserver is an optional interface that can be implemented by a Scheduler to reserve resources for the compute graph. +type Reserver interface { + Reserve() +} diff --git a/model/model.go b/model/model.go index bc8944d22..77eb4ee66 100644 --- a/model/model.go +++ b/model/model.go @@ -256,16 +256,23 @@ func setPointer(base Base, v reflect.Value, tags []Tag) { type Tag struct { Name string Alternate []string + Root bool + Device ml.Device } func ParseTags(s string) (tag Tag) { parts := strings.Split(s, ",") if len(parts) > 0 { tag.Name = parts[0] + tag.Device = ml.GPU for _, part := range parts[1:] { if value, ok := strings.CutPrefix(part, "alt:"); ok { tag.Alternate = append(tag.Alternate, value) + } else if value, ok := strings.CutPrefix(part, "root:"); ok { + tag.Root, _ = strconv.ParseBool(value) + } else if part == "cpu" { + tag.Device = ml.CPU } } } diff --git a/model/model2.go b/model/model2.go new file mode 100644 index 000000000..9bc332985 --- /dev/null +++ b/model/model2.go @@ -0,0 +1,139 @@ +package model + +import ( + "fmt" + "reflect" + "strconv" + "strings" + + "github.com/ollama/ollama/fs" + "github.com/ollama/ollama/ml" +) + +type Model2 struct { + ml.Backend2 + Model +} + +func New2(cfg *fs.Model, b ml.Backend2) (*Model2, error) { + fn, ok := models[cfg.KV.Architecture()] + if !ok { + return nil, fmt.Errorf("unsupported model architecture %q", cfg.KV.Architecture()) + } + + m, err := fn(cfg.KV) + if err != nil { + return nil, err + } + + // TODO: load tensors from the model into the backend + v := reflect.ValueOf(m) + v.Elem().Set(temp(b, cfg.Tensors, v.Elem())) + + if r, ok := b.Scheduler().(ml.Reserver); ok { + // TODO: build a graph of the model and reserve the necessary resources + r.Reserve() + } + + return &Model2{b, m}, nil +} + +func temp(b ml.Backend2, tensors map[string]fs.TensorReader, v reflect.Value, tags ...Tag) reflect.Value { + t := v.Type() + if t.Kind() != reflect.Struct { + return v + } + + allNil := true + for i := range t.NumField() { + tt := t.Field(i).Type + vv := v.Field(i) + if !vv.CanSet() { + continue + } + + tagsCopy := tags + if s := t.Field(i).Tag.Get("gguf"); s != "" { + tag := ParseTags(s) + if tag.Root { + tagsCopy = []Tag{tag} + } else { + tagsCopy = append(tagsCopy, ParseTags(s)) + } + } + + switch { + case tt == reflect.TypeOf((*ml.Tensor)(nil)).Elem(): + var permute func([]Tag) [][]string + permute = func(tags []Tag) (values [][]string) { + if len(tags) < 1 { + return nil + } + + values = [][]string{{tags[0].Name}} + for _, alt := range tags[0].Alternate { + values = append(values, []string{alt}) + } + + for i, value := range values { + for _, rest := range permute(tags[1:]) { + value = append(value, rest...) + } + + values[i] = value + } + + return values + } + + names := permute(tagsCopy) + for _, name := range names { + if tensor, ok := tensors[strings.Join(name, ".")]; ok { + vv.Set(reflect.ValueOf(b.Get(tensor, tags[0].Device))) + break + } + } + case tt.Kind() == reflect.Pointer || tt.Kind() == reflect.Interface: + setPointer2(b, tensors, vv, tagsCopy) + case tt.Kind() == reflect.Slice || tt.Kind() == reflect.Array: + for i := vv.Len() - 1; i >= 0; i-- { + vvv := vv.Index(i) + if vvv.Kind() == reflect.Pointer || vvv.Kind() == reflect.Interface { + setPointer2(b, tensors, vvv, append(tagsCopy, Tag{Name: strconv.Itoa(i)})) + } else { + vvv.Set(temp(b, tensors, vvv, append(tagsCopy, Tag{Name: strconv.Itoa(i)})...)) + } + } + } + + if !canNil(tt) || !vv.IsNil() { + allNil = false + } + } + + if allNil { + return reflect.Zero(t) + } + + return v +} + +func setPointer2(b ml.Backend2, tensors map[string]fs.TensorReader, v reflect.Value, tags []Tag) { + vv := v + if v.Kind() == reflect.Interface { + if v.IsNil() { + return + } + + vv = vv.Elem() + } + + vv = vv.Elem() + if v.IsNil() { + vv = reflect.New(v.Type().Elem()).Elem() + } + + if f := temp(b, tensors, vv, tags...); f.CanAddr() { + v.Set(f.Addr()) + } +} diff --git a/model/models/llama/model.go b/model/models/llama/model.go index 68980dd76..1c4b61fad 100644 --- a/model/models/llama/model.go +++ b/model/models/llama/model.go @@ -23,7 +23,7 @@ type Model struct { model.Base model.BytePairEncoding - TokenEmbedding *nn.Embedding `gguf:"token_embd"` + TokenEmbedding *nn.Embedding `gguf:"token_embd,cpu"` Layers []Layer `gguf:"blk"` OutputNorm *nn.RMSNorm `gguf:"output_norm"` Output *nn.Linear `gguf:"output,alt:token_embd"` @@ -61,7 +61,7 @@ func New(c fs.Config) (model.Model, error) { }, } - m.Cache = kvcache.NewCausalCache(m.Shift) + // m.Cache = kvcache.NewCausalCache(m.Shift) return &m, nil } @@ -71,7 +71,7 @@ type SelfAttention struct { Key *nn.Linear `gguf:"attn_k"` Value *nn.Linear `gguf:"attn_v"` Output *nn.Linear `gguf:"attn_output"` - RopeFactors ml.Tensor `gguf:"rope_freqs.weight"` + RopeFactors ml.Tensor `gguf:"rope_freqs.weight,root:true"` } func (sa *SelfAttention) Forward(ctx ml.Context, hiddenState, positionIDs ml.Tensor, cache kvcache.Cache, opts *Options) ml.Tensor { @@ -91,7 +91,7 @@ func (sa *SelfAttention) Forward(ctx ml.Context, hiddenState, positionIDs ml.Ten v = v.Reshape(ctx, headDim, opts.numKVHeads, batchSize) scaleFactor := 1.0 / math.Sqrt(float64(headDim)) - kqv := nn.Attention(ctx, q, k, v, scaleFactor, cache) + kqv := nn.Attention(ctx, q, k, v, scaleFactor, nil) kqv = kqv.Reshape(ctx, opts.hiddenSize, batchSize) return sa.Output.Forward(ctx, kqv) @@ -154,7 +154,7 @@ func (m *Model) Forward(ctx ml.Context, batch input.Batch) (ml.Tensor, error) { hiddenState := m.TokenEmbedding.Forward(ctx, batch.Inputs) for i, layer := range m.Layers { - m.Cache.SetLayer(i) + // m.Cache.SetLayer(i) var lastLayerOutputs ml.Tensor if i == len(m.Layers)-1 {