x/registry: mc -> s3
This commit is contained in:
parent
a292cde2f3
commit
1389e6926a
@ -29,11 +29,7 @@ const (
|
|||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
UploadChunkSize int64 // default is DefaultUploadChunkSize
|
UploadChunkSize int64 // default is DefaultUploadChunkSize
|
||||||
minioClient *minio.Client
|
S3Client *minio.Client
|
||||||
}
|
|
||||||
|
|
||||||
func New(mc *minio.Client) *Server {
|
|
||||||
return &Server{minioClient: mc}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -84,7 +80,7 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mcc := &minio.Core{Client: s.mc()}
|
mcc := &minio.Core{Client: s.s3()}
|
||||||
// TODO(bmizerany): complete uploads before stats for any with ETag
|
// TODO(bmizerany): complete uploads before stats for any with ETag
|
||||||
|
|
||||||
type completeParts struct {
|
type completeParts struct {
|
||||||
@ -136,7 +132,7 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
var e minio.ErrorResponse
|
var e minio.ErrorResponse
|
||||||
if errors.As(err, &e) && e.Code == "NoSuchUpload" {
|
if errors.As(err, &e) && e.Code == "NoSuchUpload" {
|
||||||
return oweb.Invalid("uploadId", uploadID, "unknown uploadId")
|
return oweb.Invalid("uploadId", uploadID, "")
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -158,7 +154,7 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
|
|||||||
key := path.Join("blobs", l.Digest)
|
key := path.Join("blobs", l.Digest)
|
||||||
if l.Size < minimumMultipartSize {
|
if l.Size < minimumMultipartSize {
|
||||||
// single part upload
|
// single part upload
|
||||||
signedURL, err := s.mc().PresignedPutObject(r.Context(), bucketTODO, key, 15*time.Minute)
|
signedURL, err := s.s3().PresignedPutObject(r.Context(), bucketTODO, key, 15*time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -175,7 +171,7 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
|
|||||||
for partNumber, c := range upload.Chunks(l.Size, s.uploadChunkSize()) {
|
for partNumber, c := range upload.Chunks(l.Size, s.uploadChunkSize()) {
|
||||||
const timeToStartUpload = 15 * time.Minute
|
const timeToStartUpload = 15 * time.Minute
|
||||||
|
|
||||||
signedURL, err := s.mc().Presign(r.Context(), "PUT", bucketTODO, key, timeToStartUpload, url.Values{
|
signedURL, err := s.s3().Presign(r.Context(), "PUT", bucketTODO, key, timeToStartUpload, url.Values{
|
||||||
"uploadId": []string{uploadID},
|
"uploadId": []string{uploadID},
|
||||||
"partNumber": []string{strconv.Itoa(partNumber)},
|
"partNumber": []string{strconv.Itoa(partNumber)},
|
||||||
})
|
})
|
||||||
@ -198,7 +194,7 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
|
|||||||
// Commit the manifest
|
// Commit the manifest
|
||||||
body := bytes.NewReader(pr.Manifest)
|
body := bytes.NewReader(pr.Manifest)
|
||||||
path := path.Join("manifests", path.Join(mp.Parts()...))
|
path := path.Join("manifests", path.Join(mp.Parts()...))
|
||||||
_, err := s.mc().PutObject(r.Context(), bucketTODO, path, body, int64(len(pr.Manifest)), minio.PutObjectOptions{})
|
_, err := s.s3().PutObject(r.Context(), bucketTODO, path, body, int64(len(pr.Manifest)), minio.PutObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -215,7 +211,7 @@ func (s *Server) handlePull(w http.ResponseWriter, r *http.Request) error {
|
|||||||
func (s *Server) statObject(ctx context.Context, digest string) (pushed bool, err error) {
|
func (s *Server) statObject(ctx context.Context, digest string) (pushed bool, err error) {
|
||||||
// HEAD the object
|
// HEAD the object
|
||||||
path := path.Join("blobs", digest)
|
path := path.Join("blobs", digest)
|
||||||
_, err = s.mc().StatObject(ctx, "test", path, minio.StatObjectOptions{})
|
_, err = s.s3().StatObject(ctx, "test", path, minio.StatObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isNoSuchKey(err) {
|
if isNoSuchKey(err) {
|
||||||
err = nil
|
err = nil
|
||||||
@ -230,16 +226,16 @@ func isNoSuchKey(err error) bool {
|
|||||||
return errors.As(err, &e) && e.Code == "NoSuchKey"
|
return errors.As(err, &e) && e.Code == "NoSuchKey"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) mc() *minio.Client {
|
func (s *Server) s3() *minio.Client {
|
||||||
if s.minioClient != nil {
|
if s.S3Client != nil {
|
||||||
return s.minioClient
|
return s.S3Client
|
||||||
}
|
}
|
||||||
mc, err := minio.New("localhost:9000", &minio.Options{
|
s3, err := minio.New("localhost:9000", &minio.Options{
|
||||||
Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
|
Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
|
||||||
Secure: false,
|
Secure: false,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return mc
|
return s3
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ func TestPushBasic(t *testing.T) {
|
|||||||
}`)
|
}`)
|
||||||
|
|
||||||
hs := httptest.NewServer(&Server{
|
hs := httptest.NewServer(&Server{
|
||||||
minioClient: mc,
|
S3Client: mc,
|
||||||
UploadChunkSize: 5 * MB,
|
UploadChunkSize: 5 * MB,
|
||||||
})
|
})
|
||||||
t.Cleanup(hs.Close)
|
t.Cleanup(hs.Close)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user