Compare commits
	
		
			1 Commits
		
	
	
		
			royh-opena
			...
			upload-pro
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 402babdad0 | 
							
								
								
									
										18
									
								
								cmd/cmd.go
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								cmd/cmd.go
									
									
									
									
									
								
							| @@ -94,9 +94,25 @@ func PushHandler(cmd *cobra.Command, args []string) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	var currentDigest string | ||||
| 	var bar *progressbar.ProgressBar | ||||
|  | ||||
| 	request := api.PushRequest{Name: args[0], Insecure: insecure} | ||||
| 	fn := func(resp api.ProgressResponse) error { | ||||
| 		fmt.Println(resp.Status) | ||||
| 		if resp.Digest != currentDigest && resp.Digest != "" { | ||||
| 			currentDigest = resp.Digest | ||||
| 			bar = progressbar.DefaultBytes( | ||||
| 				int64(resp.Total), | ||||
| 				fmt.Sprintf("pushing %s...", resp.Digest[7:19]), | ||||
| 			) | ||||
|  | ||||
| 			bar.Set(resp.Completed) | ||||
| 		} else if resp.Digest == currentDigest && resp.Digest != "" { | ||||
| 			bar.Set(resp.Completed) | ||||
| 		} else { | ||||
| 			currentDigest = "" | ||||
| 			fmt.Println(resp.Status) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
|   | ||||
							
								
								
									
										128
									
								
								server/images.go
									
									
									
									
									
								
							
							
						
						
									
										128
									
								
								server/images.go
									
									
									
									
									
								
							| @@ -582,14 +582,10 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | ||||
| 	} | ||||
|  | ||||
| 	var layers []*Layer | ||||
| 	var total int | ||||
| 	var completed int | ||||
| 	for _, layer := range manifest.Layers { | ||||
| 		layers = append(layers, layer) | ||||
| 		total += layer.Size | ||||
| 	} | ||||
| 	layers = append(layers, &manifest.Config) | ||||
| 	total += manifest.Config.Size | ||||
|  | ||||
| 	for _, layer := range layers { | ||||
| 		exists, err := checkBlobExistence(mp, layer.Digest, regOpts) | ||||
| @@ -598,21 +594,20 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | ||||
| 		} | ||||
|  | ||||
| 		if exists { | ||||
| 			completed += layer.Size | ||||
| 			fn(api.ProgressResponse{ | ||||
| 				Status:    "using existing layer", | ||||
| 				Digest:    layer.Digest, | ||||
| 				Total:     total, | ||||
| 				Completed: completed, | ||||
| 				Total:     layer.Size, | ||||
| 				Completed: layer.Size, | ||||
| 			}) | ||||
| 			log.Printf("Layer %s already exists", layer.Digest) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		fn(api.ProgressResponse{ | ||||
| 			Status:    "starting upload", | ||||
| 			Digest:    layer.Digest, | ||||
| 			Total:     total, | ||||
| 			Completed: completed, | ||||
| 			Status: "starting upload", | ||||
| 			Digest: layer.Digest, | ||||
| 			Total:  layer.Size, | ||||
| 		}) | ||||
|  | ||||
| 		location, err := startUpload(mp, regOpts) | ||||
| @@ -621,25 +616,14 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		err = uploadBlob(location, layer, regOpts) | ||||
| 		err = uploadBlobChunked(mp, location, layer, regOpts, fn) | ||||
| 		if err != nil { | ||||
| 			log.Printf("error uploading blob: %v", err) | ||||
| 			return err | ||||
| 		} | ||||
| 		completed += layer.Size | ||||
| 		fn(api.ProgressResponse{ | ||||
| 			Status:    "upload complete", | ||||
| 			Digest:    layer.Digest, | ||||
| 			Total:     total, | ||||
| 			Completed: completed, | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	fn(api.ProgressResponse{ | ||||
| 		Status:    "pushing manifest", | ||||
| 		Total:     total, | ||||
| 		Completed: completed, | ||||
| 	}) | ||||
| 	fn(api.ProgressResponse{Status: "pushing manifest"}) | ||||
| 	url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag) | ||||
| 	headers := map[string]string{ | ||||
| 		"Content-Type": "application/vnd.docker.distribution.manifest.v2+json", | ||||
| @@ -662,11 +646,7 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon | ||||
| 		return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | ||||
| 	} | ||||
|  | ||||
| 	fn(api.ProgressResponse{ | ||||
| 		Status:    "success", | ||||
| 		Total:     total, | ||||
| 		Completed: completed, | ||||
| 	}) | ||||
| 	fn(api.ProgressResponse{Status: "success"}) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -828,19 +808,14 @@ func checkBlobExistence(mp ModelPath, digest string, regOpts *RegistryOptions) ( | ||||
| 	return resp.StatusCode == http.StatusOK, nil | ||||
| } | ||||
|  | ||||
| func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error { | ||||
| 	// Create URL | ||||
| 	url := fmt.Sprintf("%s&digest=%s", location, layer.Digest) | ||||
|  | ||||
| 	headers := make(map[string]string) | ||||
| 	headers["Content-Length"] = fmt.Sprintf("%d", layer.Size) | ||||
| 	headers["Content-Type"] = "application/octet-stream" | ||||
|  | ||||
| 	// TODO change from monolithic uploads to chunked uploads | ||||
| func uploadBlobChunked(mp ModelPath, location string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error { | ||||
| 	// TODO allow resumability | ||||
| 	// TODO allow canceling uploads via DELETE | ||||
| 	// TODO allow cross repo blob mount | ||||
|  | ||||
| 	// Create URL | ||||
| 	url := fmt.Sprintf("%s", location) | ||||
|  | ||||
| 	fp, err := GetBlobsPath(layer.Digest) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -851,19 +826,72 @@ func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	resp, err := makeRequest("PUT", url, headers, f, regOpts) | ||||
| 	if err != nil { | ||||
| 		log.Printf("couldn't upload blob: %v", err) | ||||
| 		return err | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
| 	headers := make(map[string]string) | ||||
| 	headers["Content-Type"] = "application/octet-stream" | ||||
|  | ||||
| 	// Check for success: For a successful upload, the Docker registry will respond with a 201 Created | ||||
| 	if resp.StatusCode != http.StatusCreated { | ||||
| 		body, _ := io.ReadAll(resp.Body) | ||||
| 		return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | ||||
| 	} | ||||
| 	chunkSize := 1 << 20 | ||||
| 	buf := make([]byte, chunkSize) | ||||
| 	var totalUploaded int | ||||
|  | ||||
| 	for { | ||||
| 		n, err := f.Read(buf) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		headers["Content-Length"] = fmt.Sprintf("%d", n) | ||||
| 		headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1) | ||||
|  | ||||
| 		fn(api.ProgressResponse{ | ||||
| 			Status:    fmt.Sprintf("uploading %s", layer.Digest), | ||||
| 			Digest:    layer.Digest, | ||||
| 			Total:     int(layer.Size), | ||||
| 			Completed: int(totalUploaded), | ||||
| 		}) | ||||
|  | ||||
| 		// change the buffersize for the last chunk | ||||
| 		if n < chunkSize { | ||||
| 			buf = buf[:n] | ||||
| 		} | ||||
| 		resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts) | ||||
| 		if err != nil { | ||||
| 			log.Printf("couldn't upload blob: %v", err) | ||||
| 			return err | ||||
| 		} | ||||
| 		defer resp.Body.Close() | ||||
| 		url = resp.Header.Get("Location") | ||||
|  | ||||
| 		// Check for success: For a successful upload, the Docker registry will respond with a 201 Created | ||||
| 		if resp.StatusCode != http.StatusAccepted { | ||||
| 			fn(api.ProgressResponse{ | ||||
| 				Status:    fmt.Sprintf("error uploading layer"), | ||||
| 				Digest:    layer.Digest, | ||||
| 				Total:     int(layer.Size), | ||||
| 				Completed: int(totalUploaded), | ||||
| 			}) | ||||
| 			body, _ := io.ReadAll(resp.Body) | ||||
| 			return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | ||||
| 		} | ||||
|  | ||||
| 		totalUploaded += n | ||||
| 		if totalUploaded >= layer.Size { | ||||
| 			url = fmt.Sprintf("%s&digest=%s", url, layer.Digest) | ||||
|  | ||||
| 			// finish the upload | ||||
| 			resp, err := makeRequest("PUT", url, nil, nil, regOpts) | ||||
| 			if err != nil { | ||||
| 				log.Printf("couldn't finish upload: %v", err) | ||||
| 				return err | ||||
| 			} | ||||
| 			defer resp.Body.Close() | ||||
|  | ||||
| 			if resp.StatusCode != http.StatusCreated { | ||||
| 				body, _ := io.ReadAll(resp.Body) | ||||
| 				return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) | ||||
| 			} | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -974,8 +1002,6 @@ func makeRequest(method, url string, headers map[string]string, body io.Reader, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	log.Printf("url = %s", url) | ||||
|  | ||||
| 	req, err := http.NewRequest(method, url, body) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
|   | ||||
		Reference in New Issue
	
	Block a user