This commit is contained in:
Michael Yang 2025-03-18 14:49:51 -07:00 committed by Michael Yang
parent c8245f3ef3
commit 2d7e8e82ab
8 changed files with 572 additions and 8 deletions

88
fs/fs.go Normal file
View File

@ -0,0 +1,88 @@
package fs
import (
"fmt"
"io"
"log/slog"
"os"
"github.com/ollama/ollama/fs/ggml"
)
type DType int
type Model struct {
KV Config
Tensors map[string]TensorReader
}
func (m Model) LogValue() slog.Value {
return slog.GroupValue(
slog.String("architecture", m.KV.Architecture()),
)
}
type Tensor interface {
Name() string
Shape() []int
DType() DType
Size() int
}
type TensorReader interface {
Tensor
io.Reader
}
type shimTensorReader struct {
internal *ggml.Tensor
*io.SectionReader
}
func (t *shimTensorReader) Name() string {
return t.internal.Name
}
func (t *shimTensorReader) Shape() []int {
shape := make([]int, len(t.internal.Shape))
for i, s := range t.internal.Shape {
shape[i] = int(s)
}
return shape
}
func (t *shimTensorReader) Size() int {
return int(t.internal.Size())
}
func (t *shimTensorReader) DType() DType {
return DType(t.internal.Kind)
}
func ReadFrom(f *os.File) (*Model, error) {
bts, err := io.ReadAll(io.NewSectionReader(f, 0, 4))
if err != nil {
return nil, err
}
switch ggml.DetectContentType(bts[:4]) {
case "gguf":
c, _, err := ggml.Decode(f, -1)
if err != nil {
return nil, err
}
tensors := make(map[string]TensorReader, len(c.Tensors().Items()))
for _, t := range c.Tensors().Items() {
tensors[t.Name] = &shimTensorReader{
internal: t,
SectionReader: io.NewSectionReader(f, int64(c.Tensors().Offset+t.Offset), int64(t.Size())),
}
}
return &Model{KV: c.KV(), Tensors: tensors}, nil
default:
return nil, fmt.Errorf("unsupported file type")
}
}

View File

@ -9,7 +9,9 @@ package ggml
import "C"
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
@ -19,6 +21,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"unicode"
"unsafe"
@ -299,6 +302,11 @@ func New(ctx context.Context, r *os.File, params ml.BackendParams) (ml.Backend,
var doneBytes atomic.Uint64
totalBytes := uint64(n) - meta.Tensors().Offset
pool := sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(runtime.GOMAXPROCS(0))
@ -320,19 +328,32 @@ func New(ctx context.Context, r *os.File, params ml.BackendParams) (ml.Backend,
}
sr := io.NewSectionReader(r, int64(meta.Tensors().Offset+t.Offset), int64(t.Size()))
bts := make([]byte, 128*format.KibiByte)
// bts := make([]byte, 128*format.KibiByte)
var s uint64
for s < t.Size() {
n, err := io.ReadFull(sr, bts[:min(len(bts), int(t.Size()-s))])
if err != nil {
b := pool.Get().(*bytes.Buffer)
b.Reset()
// n, err := io.ReadFull(sr, bts[:min(len(bts), int(t.Size()-s))])
// if err != nil {
// return err
// }
n, err := io.CopyN(b, sr, 256*format.KibiByte)
if n > 0 {
} else if errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
bts := b.Bytes()
for _, tt := range tts {
C.ggml_backend_tensor_set(tt, unsafe.Pointer(&bts[0]), C.size_t(s), C.size_t(n))
}
pool.Put(b)
s += uint64(n)
if params.Progress != nil {

273
ml/backend/ggml/ggml2.go Normal file
View File

@ -0,0 +1,273 @@
package ggml
// #cgo CPPFLAGS: -I${SRCDIR}/ggml/include
// #include <stdlib.h>
// #include <stdint.h>
// #include "ggml.h"
// #include "ggml-cpu.h"
// #include "ggml-backend.h"
import "C"
import (
"bytes"
"context"
"errors"
"io"
"log/slog"
"runtime"
"sync"
"unsafe"
"github.com/ollama/ollama/format"
"github.com/ollama/ollama/fs"
"github.com/ollama/ollama/ml"
ggml "github.com/ollama/ollama/ml/backend/ggml/ggml/src"
"golang.org/x/sync/errgroup"
)
type backend struct {
gpus, cpus []*C.struct_ggml_backend_device
bufts map[*C.struct_ggml_backend_device][]*C.struct_ggml_backend_buffer_type
ctxs map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_context
bbs map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_backend_buffer
readers map[*C.struct_ggml_tensor]io.Reader
reserved map[*C.struct_ggml_context]uint64
onceScheduler sync.Once
scheduler *scheduler
}
var _ ml.Backend2 = (*backend)(nil)
func New2() (ml.Backend2, error) {
ggml.OnceLoad()
var cpus, accels, gpus []*C.struct_ggml_backend_device
for i := range C.ggml_backend_dev_count() {
d := C.ggml_backend_dev_get(C.size_t(i))
switch C.ggml_backend_dev_type(d) {
case C.GGML_BACKEND_DEVICE_TYPE_CPU:
// only the first cpu device should be used
if len(cpus) > 0 {
continue
}
cpus = append(cpus, d)
case C.GGML_BACKEND_DEVICE_TYPE_ACCEL:
accels = append(accels, d)
case C.GGML_BACKEND_DEVICE_TYPE_GPU:
gpus = append(gpus, d)
}
}
bufts := make(map[*C.struct_ggml_backend_device][]*C.struct_ggml_backend_buffer_type)
cpu := C.ggml_backend_dev_by_type(C.GGML_BACKEND_DEVICE_TYPE_CPU)
for _, d := range append(accels, cpus...) {
bufts[cpu] = append(bufts[cpu], C.ggml_backend_dev_buffer_type(d))
}
for _, d := range gpus {
bufts[d] = append(bufts[d], append([]*C.struct_ggml_backend_buffer_type{C.ggml_backend_dev_buffer_type(d)}, bufts[cpu]...)...)
}
return &backend{
// merge accels and cpus
gpus: gpus,
cpus: append(accels, cpus...),
bufts: bufts,
ctxs: make(map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_context, len(bufts)),
bbs: make(map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_backend_buffer, len(bufts)),
readers: make(map[*C.struct_ggml_tensor]io.Reader),
reserved: make(map[*C.struct_ggml_context]uint64),
}, nil
}
func (b *backend) Close() {
}
func (b *backend) NewContext() ml.Context {
return &Context{
b: &Backend{
input: b.bufts[b.cpus[0]][0],
output: b.bufts[b.cpus[0]][0],
layers: func() map[int]*C.struct_ggml_backend_buffer_type {
m := make(map[int]*C.struct_ggml_backend_buffer_type)
for i := range 100 {
m[i] = b.bufts[b.gpus[0]][0]
}
return m
}(),
sched: func() *C.struct_ggml_backend_sched {
return b.Scheduler().(*scheduler).s
}(),
maxGraphNodes: 8192,
},
ctx: C.ggml_init(C.struct_ggml_init_params{
mem_size: C.ggml_tensor_overhead() * C.size_t(4000),
no_alloc: true,
}),
buft: b.bufts[b.cpus[0]][0],
maxGraphNodes: 8192,
}
}
func (b *backend) Get(tensorReader fs.TensorReader, preferredDevice ml.Device) ml.Tensor {
var ctx *C.struct_ggml_context
var devices []*C.struct_ggml_backend_device
if preferredDevice == ml.GPU {
devices = b.gpus
}
for _, d := range append(devices, b.cpus...) {
var free, total C.size_t
C.ggml_backend_dev_memory(d, &free, &total)
for _, buft := range b.bufts[d] {
if _, ok := b.ctxs[buft]; !ok {
b.ctxs[buft] = C.ggml_init(C.struct_ggml_init_params{
mem_size: C.ggml_tensor_overhead() * C.size_t(1000),
no_alloc: true,
})
}
ctx = b.ctxs[buft]
if free > 0 && b.reserved[ctx]+uint64(tensorReader.Size()) >= uint64(free) {
slog.Info("no space available", "device", C.GoString(C.ggml_backend_dev_name(d)), "free", format.HumanBytes2(uint64(free)), "total", format.HumanBytes2(uint64(total)), "reserve", format.HumanBytes2(b.reserved[ctx]), "size", format.HumanBytes2(uint64(tensorReader.Size())))
continue
}
cname := C.CString(tensorReader.Name())
defer C.free(unsafe.Pointer(cname))
if t := C.ggml_get_tensor(ctx, cname); t != nil {
slog.Info("using existing tensor in buffer type", "name", tensorReader.Name(), "buffer_type", C.GoString(C.ggml_backend_buft_name(buft)))
return &Tensor{t: t}
}
shape := make([]C.int64_t, len(tensorReader.Shape()))
for i, s := range tensorReader.Shape() {
shape[i] = C.int64_t(s)
}
t := C.ggml_new_tensor(ctx, uint32(tensorReader.DType()), C.int(len(tensorReader.Shape())), unsafe.SliceData(shape))
C.ggml_set_name(t, cname)
b.readers[t] = tensorReader
b.reserved[ctx] += uint64(tensorReader.Size())
slog.Info("creating new tensor in buffer type", "name", tensorReader.Name(), "buffer_type", C.GoString(C.ggml_backend_buft_name(buft)), "reserve", format.HumanBytes2(b.reserved[ctx]))
return &Tensor{t: t}
}
}
panic("no device available")
}
func (b *backend) LoadAll(ctx context.Context) error {
// allocate buffers for each context
for buft, ctx := range b.ctxs {
if C.ggml_get_first_tensor(ctx) == nil {
continue
}
bb := C.ggml_backend_alloc_ctx_tensors_from_buft(ctx, buft)
C.ggml_backend_buffer_set_usage(bb, C.GGML_BACKEND_BUFFER_USAGE_WEIGHTS)
b.bbs[buft] = bb
}
for _, bb := range b.bbs {
slog.Info("", "buffer.size", C.ggml_backend_buffer_get_size(bb), "buffer.usage", C.ggml_backend_buffer_get_usage(bb))
}
pool := sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(runtime.GOMAXPROCS(0))
for t, r := range b.readers {
g.Go(func() error {
var s uint64
for {
b := pool.Get().(*bytes.Buffer)
b.Reset()
n, err := io.CopyN(b, r, 32*format.KibiByte)
if n > 0 {
} else if errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
C.ggml_backend_tensor_set(t, unsafe.Pointer(&b.Bytes()[0]), C.size_t(s), C.size_t(n))
pool.Put(b)
}
return nil
})
}
go func() {
<-ctx.Done()
g.Go(func() error {
return ctx.Err()
})
}()
return g.Wait()
}
type scheduler struct {
s *C.struct_ggml_backend_sched
}
var (
_ ml.Scheduler = (*scheduler)(nil)
_ ml.Reserver = (*scheduler)(nil)
)
func (b *backend) Scheduler() ml.Scheduler {
b.onceScheduler.Do(func() {
devices := append(b.gpus, b.cpus...)
backends := make([]C.ggml_backend_t, len(devices))
bufts := make([]C.ggml_backend_buffer_type_t, len(devices))
for i, device := range devices {
backend := C.ggml_backend_dev_init(device, nil)
buft := C.ggml_backend_get_default_buffer_type(backend)
if d := C.ggml_backend_get_device(backend); C.ggml_backend_dev_type(d) == C.GGML_BACKEND_DEVICE_TYPE_CPU && len(b.gpus) > 0 {
if hbt := C.ggml_backend_dev_host_buffer_type(b.gpus[0]); hbt != nil {
buft = hbt
}
}
slog.Info("scheduler", "backend", C.GoString(C.ggml_backend_name(backend)), "buffer_type", C.GoString(C.ggml_backend_buft_name(buft)))
backends[i] = backend
bufts[i] = buft
}
maxGraphNodes := max(8192, 1)
b.scheduler = &scheduler{
s: C.ggml_backend_sched_new(
unsafe.SliceData(backends),
unsafe.SliceData(bufts),
C.int(len(backends)),
C.size_t(maxGraphNodes),
C._Bool(len(b.gpus) > 1),
),
}
})
return b.scheduler
}
func (s scheduler) Schedule() {
}
func (s scheduler) Reserve() {
}

25
ml/backend2.go Normal file
View File

@ -0,0 +1,25 @@
package ml
import (
"context"
"github.com/ollama/ollama/fs"
)
type Device int
const (
CPU Device = iota
GPU
)
type Backend2 interface {
Close()
NewContext() Context
Scheduler() Scheduler
Get(fs.TensorReader, Device) Tensor
LoadAll(context.Context) error
}

11
ml/scheduler.go Normal file
View File

@ -0,0 +1,11 @@
package ml
// Scheduler is an interface that can be implemented by a Backend to schedule resources.
type Scheduler interface {
Schedule()
}
// Reserver is an optional interface that can be implemented by a Scheduler to reserve resources for the compute graph.
type Reserver interface {
Reserve()
}

View File

@ -256,16 +256,23 @@ func setPointer(base Base, v reflect.Value, tags []Tag) {
type Tag struct {
Name string
Alternate []string
Root bool
Device ml.Device
}
func ParseTags(s string) (tag Tag) {
parts := strings.Split(s, ",")
if len(parts) > 0 {
tag.Name = parts[0]
tag.Device = ml.GPU
for _, part := range parts[1:] {
if value, ok := strings.CutPrefix(part, "alt:"); ok {
tag.Alternate = append(tag.Alternate, value)
} else if value, ok := strings.CutPrefix(part, "root:"); ok {
tag.Root, _ = strconv.ParseBool(value)
} else if part == "cpu" {
tag.Device = ml.CPU
}
}
}

139
model/model2.go Normal file
View File

@ -0,0 +1,139 @@
package model
import (
"fmt"
"reflect"
"strconv"
"strings"
"github.com/ollama/ollama/fs"
"github.com/ollama/ollama/ml"
)
type Model2 struct {
ml.Backend2
Model
}
func New2(cfg *fs.Model, b ml.Backend2) (*Model2, error) {
fn, ok := models[cfg.KV.Architecture()]
if !ok {
return nil, fmt.Errorf("unsupported model architecture %q", cfg.KV.Architecture())
}
m, err := fn(cfg.KV)
if err != nil {
return nil, err
}
// TODO: load tensors from the model into the backend
v := reflect.ValueOf(m)
v.Elem().Set(temp(b, cfg.Tensors, v.Elem()))
if r, ok := b.Scheduler().(ml.Reserver); ok {
// TODO: build a graph of the model and reserve the necessary resources
r.Reserve()
}
return &Model2{b, m}, nil
}
func temp(b ml.Backend2, tensors map[string]fs.TensorReader, v reflect.Value, tags ...Tag) reflect.Value {
t := v.Type()
if t.Kind() != reflect.Struct {
return v
}
allNil := true
for i := range t.NumField() {
tt := t.Field(i).Type
vv := v.Field(i)
if !vv.CanSet() {
continue
}
tagsCopy := tags
if s := t.Field(i).Tag.Get("gguf"); s != "" {
tag := ParseTags(s)
if tag.Root {
tagsCopy = []Tag{tag}
} else {
tagsCopy = append(tagsCopy, ParseTags(s))
}
}
switch {
case tt == reflect.TypeOf((*ml.Tensor)(nil)).Elem():
var permute func([]Tag) [][]string
permute = func(tags []Tag) (values [][]string) {
if len(tags) < 1 {
return nil
}
values = [][]string{{tags[0].Name}}
for _, alt := range tags[0].Alternate {
values = append(values, []string{alt})
}
for i, value := range values {
for _, rest := range permute(tags[1:]) {
value = append(value, rest...)
}
values[i] = value
}
return values
}
names := permute(tagsCopy)
for _, name := range names {
if tensor, ok := tensors[strings.Join(name, ".")]; ok {
vv.Set(reflect.ValueOf(b.Get(tensor, tags[0].Device)))
break
}
}
case tt.Kind() == reflect.Pointer || tt.Kind() == reflect.Interface:
setPointer2(b, tensors, vv, tagsCopy)
case tt.Kind() == reflect.Slice || tt.Kind() == reflect.Array:
for i := vv.Len() - 1; i >= 0; i-- {
vvv := vv.Index(i)
if vvv.Kind() == reflect.Pointer || vvv.Kind() == reflect.Interface {
setPointer2(b, tensors, vvv, append(tagsCopy, Tag{Name: strconv.Itoa(i)}))
} else {
vvv.Set(temp(b, tensors, vvv, append(tagsCopy, Tag{Name: strconv.Itoa(i)})...))
}
}
}
if !canNil(tt) || !vv.IsNil() {
allNil = false
}
}
if allNil {
return reflect.Zero(t)
}
return v
}
func setPointer2(b ml.Backend2, tensors map[string]fs.TensorReader, v reflect.Value, tags []Tag) {
vv := v
if v.Kind() == reflect.Interface {
if v.IsNil() {
return
}
vv = vv.Elem()
}
vv = vv.Elem()
if v.IsNil() {
vv = reflect.New(v.Type().Elem()).Elem()
}
if f := temp(b, tensors, vv, tags...); f.CanAddr() {
v.Set(f.Addr())
}
}

View File

@ -23,7 +23,7 @@ type Model struct {
model.Base
model.BytePairEncoding
TokenEmbedding *nn.Embedding `gguf:"token_embd"`
TokenEmbedding *nn.Embedding `gguf:"token_embd,cpu"`
Layers []Layer `gguf:"blk"`
OutputNorm *nn.RMSNorm `gguf:"output_norm"`
Output *nn.Linear `gguf:"output,alt:token_embd"`
@ -61,7 +61,7 @@ func New(c fs.Config) (model.Model, error) {
},
}
m.Cache = kvcache.NewCausalCache(m.Shift)
// m.Cache = kvcache.NewCausalCache(m.Shift)
return &m, nil
}
@ -71,7 +71,7 @@ type SelfAttention struct {
Key *nn.Linear `gguf:"attn_k"`
Value *nn.Linear `gguf:"attn_v"`
Output *nn.Linear `gguf:"attn_output"`
RopeFactors ml.Tensor `gguf:"rope_freqs.weight"`
RopeFactors ml.Tensor `gguf:"rope_freqs.weight,root:true"`
}
func (sa *SelfAttention) Forward(ctx ml.Context, hiddenState, positionIDs ml.Tensor, cache kvcache.Cache, opts *Options) ml.Tensor {
@ -91,7 +91,7 @@ func (sa *SelfAttention) Forward(ctx ml.Context, hiddenState, positionIDs ml.Ten
v = v.Reshape(ctx, headDim, opts.numKVHeads, batchSize)
scaleFactor := 1.0 / math.Sqrt(float64(headDim))
kqv := nn.Attention(ctx, q, k, v, scaleFactor, cache)
kqv := nn.Attention(ctx, q, k, v, scaleFactor, nil)
kqv = kqv.Reshape(ctx, opts.hiddenSize, batchSize)
return sa.Output.Forward(ctx, kqv)
@ -154,7 +154,7 @@ func (m *Model) Forward(ctx ml.Context, batch input.Batch) (ml.Tensor, error) {
hiddenState := m.TokenEmbedding.Forward(ctx, batch.Inputs)
for i, layer := range m.Layers {
m.Cache.SetLayer(i)
// m.Cache.SetLayer(i)
var lastLayerOutputs ml.Tensor
if i == len(m.Layers)-1 {