Compare commits
	
		
			1 Commits
		
	
	
		
			v0.1.49-rc
			...
			upload-pro
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 402babdad0 | 
							
								
								
									
										18
									
								
								cmd/cmd.go
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								cmd/cmd.go
									
									
									
									
									
								
							| @@ -94,9 +94,25 @@ func PushHandler(cmd *cobra.Command, args []string) error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	var currentDigest string | ||||||
|  | 	var bar *progressbar.ProgressBar | ||||||
|  |  | ||||||
| 	request := api.PushRequest{Name: args[0], Insecure: insecure} | 	request := api.PushRequest{Name: args[0], Insecure: insecure} | ||||||
| 	fn := func(resp api.ProgressResponse) error { | 	fn := func(resp api.ProgressResponse) error { | ||||||
| 		fmt.Println(resp.Status) | 		if resp.Digest != currentDigest && resp.Digest != "" { | ||||||
|  | 			currentDigest = resp.Digest | ||||||
|  | 			bar = progressbar.DefaultBytes( | ||||||
|  | 				int64(resp.Total), | ||||||
|  | 				fmt.Sprintf("pushing %s...", resp.Digest[7:19]), | ||||||
|  | 			) | ||||||
|  |  | ||||||
|  | 			bar.Set(resp.Completed) | ||||||
|  | 		} else if resp.Digest == currentDigest && resp.Digest != "" { | ||||||
|  | 			bar.Set(resp.Completed) | ||||||
|  | 		} else { | ||||||
|  | 			currentDigest = "" | ||||||
|  | 			fmt.Println(resp.Status) | ||||||
|  | 		} | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										128
									
								
								server/images.go
									
									
									
									
									
								
							
							
						
						
									
										128
									
								
								server/images.go
									
									
									
									
									
								
							| @@ -582,14 +582,10 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var layers []*Layer | 	var layers []*Layer | ||||||
| 	var total int |  | ||||||
| 	var completed int |  | ||||||
| 	for _, layer := range manifest.Layers { | 	for _, layer := range manifest.Layers { | ||||||
| 		layers = append(layers, layer) | 		layers = append(layers, layer) | ||||||
| 		total += layer.Size |  | ||||||
| 	} | 	} | ||||||
| 	layers = append(layers, &manifest.Config) | 	layers = append(layers, &manifest.Config) | ||||||
| 	total += manifest.Config.Size |  | ||||||
|  |  | ||||||
| 	for _, layer := range layers { | 	for _, layer := range layers { | ||||||
| 		exists, err := checkBlobExistence(mp, layer.Digest, regOpts) | 		exists, err := checkBlobExistence(mp, layer.Digest, regOpts) | ||||||
| @@ -598,21 +594,20 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if exists { | 		if exists { | ||||||
| 			completed += layer.Size |  | ||||||
| 			fn(api.ProgressResponse{ | 			fn(api.ProgressResponse{ | ||||||
| 				Status:    "using existing layer", | 				Status:    "using existing layer", | ||||||
| 				Digest:    layer.Digest, | 				Digest:    layer.Digest, | ||||||
| 				Total:     total, | 				Total:     layer.Size, | ||||||
| 				Completed: completed, | 				Completed: layer.Size, | ||||||
| 			}) | 			}) | ||||||
|  | 			log.Printf("Layer %s already exists", layer.Digest) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		fn(api.ProgressResponse{ | 		fn(api.ProgressResponse{ | ||||||
| 			Status:    "starting upload", | 			Status: "starting upload", | ||||||
| 			Digest:    layer.Digest, | 			Digest: layer.Digest, | ||||||
| 			Total:     total, | 			Total:  layer.Size, | ||||||
| 			Completed: completed, |  | ||||||
| 		}) | 		}) | ||||||
|  |  | ||||||
| 		location, err := startUpload(mp, regOpts) | 		location, err := startUpload(mp, regOpts) | ||||||
| @@ -621,25 +616,14 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | |||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		err = uploadBlob(location, layer, regOpts) | 		err = uploadBlobChunked(mp, location, layer, regOpts, fn) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			log.Printf("error uploading blob: %v", err) | 			log.Printf("error uploading blob: %v", err) | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 		completed += layer.Size |  | ||||||
| 		fn(api.ProgressResponse{ |  | ||||||
| 			Status:    "upload complete", |  | ||||||
| 			Digest:    layer.Digest, |  | ||||||
| 			Total:     total, |  | ||||||
| 			Completed: completed, |  | ||||||
| 		}) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fn(api.ProgressResponse{ | 	fn(api.ProgressResponse{Status: "pushing manifest"}) | ||||||
| 		Status:    "pushing manifest", |  | ||||||
| 		Total:     total, |  | ||||||
| 		Completed: completed, |  | ||||||
| 	}) |  | ||||||
| 	url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag) | 	url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag) | ||||||
| 	headers := map[string]string{ | 	headers := map[string]string{ | ||||||
| 		"Content-Type": "application/vnd.docker.distribution.manifest.v2+json", | 		"Content-Type": "application/vnd.docker.distribution.manifest.v2+json", | ||||||
| @@ -662,11 +646,7 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | |||||||
| 		return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | 		return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fn(api.ProgressResponse{ | 	fn(api.ProgressResponse{Status: "success"}) | ||||||
| 		Status:    "success", |  | ||||||
| 		Total:     total, |  | ||||||
| 		Completed: completed, |  | ||||||
| 	}) |  | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -828,19 +808,14 @@ func checkBlobExistence(mp ModelPath, digest string, regOpts *RegistryOptions) ( | |||||||
| 	return resp.StatusCode == http.StatusOK, nil | 	return resp.StatusCode == http.StatusOK, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error { | func uploadBlobChunked(mp ModelPath, location string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error { | ||||||
| 	// Create URL |  | ||||||
| 	url := fmt.Sprintf("%s&digest=%s", location, layer.Digest) |  | ||||||
|  |  | ||||||
| 	headers := make(map[string]string) |  | ||||||
| 	headers["Content-Length"] = fmt.Sprintf("%d", layer.Size) |  | ||||||
| 	headers["Content-Type"] = "application/octet-stream" |  | ||||||
|  |  | ||||||
| 	// TODO change from monolithic uploads to chunked uploads |  | ||||||
| 	// TODO allow resumability | 	// TODO allow resumability | ||||||
| 	// TODO allow canceling uploads via DELETE | 	// TODO allow canceling uploads via DELETE | ||||||
| 	// TODO allow cross repo blob mount | 	// TODO allow cross repo blob mount | ||||||
|  |  | ||||||
|  | 	// Create URL | ||||||
|  | 	url := fmt.Sprintf("%s", location) | ||||||
|  |  | ||||||
| 	fp, err := GetBlobsPath(layer.Digest) | 	fp, err := GetBlobsPath(layer.Digest) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| @@ -851,19 +826,72 @@ func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	resp, err := makeRequest("PUT", url, headers, f, regOpts) | 	headers := make(map[string]string) | ||||||
| 	if err != nil { | 	headers["Content-Type"] = "application/octet-stream" | ||||||
| 		log.Printf("couldn't upload blob: %v", err) |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	defer resp.Body.Close() |  | ||||||
|  |  | ||||||
| 	// Check for success: For a successful upload, the Docker registry will respond with a 201 Created | 	chunkSize := 1 << 20 | ||||||
| 	if resp.StatusCode != http.StatusCreated { | 	buf := make([]byte, chunkSize) | ||||||
| 		body, _ := io.ReadAll(resp.Body) | 	var totalUploaded int | ||||||
| 		return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		n, err := f.Read(buf) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		headers["Content-Length"] = fmt.Sprintf("%d", n) | ||||||
|  | 		headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1) | ||||||
|  |  | ||||||
|  | 		fn(api.ProgressResponse{ | ||||||
|  | 			Status:    fmt.Sprintf("uploading %s", layer.Digest), | ||||||
|  | 			Digest:    layer.Digest, | ||||||
|  | 			Total:     int(layer.Size), | ||||||
|  | 			Completed: int(totalUploaded), | ||||||
|  | 		}) | ||||||
|  |  | ||||||
|  | 		// change the buffersize for the last chunk | ||||||
|  | 		if n < chunkSize { | ||||||
|  | 			buf = buf[:n] | ||||||
|  | 		} | ||||||
|  | 		resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Printf("couldn't upload blob: %v", err) | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		defer resp.Body.Close() | ||||||
|  | 		url = resp.Header.Get("Location") | ||||||
|  |  | ||||||
|  | 		// Check for success: For a successful upload, the Docker registry will respond with a 201 Created | ||||||
|  | 		if resp.StatusCode != http.StatusAccepted { | ||||||
|  | 			fn(api.ProgressResponse{ | ||||||
|  | 				Status:    fmt.Sprintf("error uploading layer"), | ||||||
|  | 				Digest:    layer.Digest, | ||||||
|  | 				Total:     int(layer.Size), | ||||||
|  | 				Completed: int(totalUploaded), | ||||||
|  | 			}) | ||||||
|  | 			body, _ := io.ReadAll(resp.Body) | ||||||
|  | 			return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		totalUploaded += n | ||||||
|  | 		if totalUploaded >= layer.Size { | ||||||
|  | 			url = fmt.Sprintf("%s&digest=%s", url, layer.Digest) | ||||||
|  |  | ||||||
|  | 			// finish the upload | ||||||
|  | 			resp, err := makeRequest("PUT", url, nil, nil, regOpts) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.Printf("couldn't finish upload: %v", err) | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 			defer resp.Body.Close() | ||||||
|  |  | ||||||
|  | 			if resp.StatusCode != http.StatusCreated { | ||||||
|  | 				body, _ := io.ReadAll(resp.Body) | ||||||
|  | 				return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | ||||||
|  | 			} | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -974,8 +1002,6 @@ func makeRequest(method, url string, headers map[string]string, body io.Reader, | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	log.Printf("url = %s", url) |  | ||||||
|  |  | ||||||
| 	req, err := http.NewRequest(method, url, body) | 	req, err := http.NewRequest(method, url, body) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user