From 6cab46ab1af3b39dcfc84fddde87b9f80c1c3596 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Tue, 3 Sep 2024 10:57:01 -0700 Subject: [PATCH 1/2] Fix container parallel upload bugs --- routers/api/packages/container/blob.go | 31 +++++++++++++++------ routers/api/packages/container/container.go | 13 ++++++++- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/routers/api/packages/container/blob.go b/routers/api/packages/container/blob.go index f2d63297c1411..095bc36f46d91 100644 --- a/routers/api/packages/container/blob.go +++ b/routers/api/packages/container/blob.go @@ -10,23 +10,30 @@ import ( "fmt" "os" "strings" - "sync" "code.gitea.io/gitea/models/db" packages_model "code.gitea.io/gitea/models/packages" container_model "code.gitea.io/gitea/models/packages/container" + user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/log" packages_module "code.gitea.io/gitea/modules/packages" container_module "code.gitea.io/gitea/modules/packages/container" + "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" packages_service "code.gitea.io/gitea/services/packages" ) -var uploadVersionMutex sync.Mutex +var uploadVersionMutex = sync.NewExclusivePool() // saveAsPackageBlob creates a package blob from an upload // The uploaded blob gets stored in a special upload version to link them to the package/image func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) { + // FIXME: Replace usage of mutex with database transaction + // https://github.com/go-gitea/gitea/pull/21862 + pkgPath := pci.PackageInfo.Owner.LowerName + "/" + pci.PackageInfo.Name + uploadVersionMutex.CheckIn(pkgPath) + defer uploadVersionMutex.CheckOut(pkgPath) + pb := packages_service.NewPackageBlob(hsr) exists := false @@ -80,6 +87,12 @@ func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader // mountBlob mounts the specific blob to a different package func mountBlob(ctx context.Context, pi *packages_service.PackageInfo, pb *packages_model.PackageBlob) error { + // FIXME: Replace usage of mutex with database transaction + // https://github.com/go-gitea/gitea/pull/21862 + pkgPath := pi.Owner.LowerName + "/" + pi.Name + uploadVersionMutex.CheckIn(pkgPath) + defer uploadVersionMutex.CheckOut(pkgPath) + uploadVersion, err := getOrCreateUploadVersion(ctx, pi) if err != nil { return err @@ -93,9 +106,6 @@ func mountBlob(ctx context.Context, pi *packages_service.PackageInfo, pb *packag func getOrCreateUploadVersion(ctx context.Context, pi *packages_service.PackageInfo) (*packages_model.PackageVersion, error) { var uploadVersion *packages_model.PackageVersion - // FIXME: Replace usage of mutex with database transaction - // https://github.com/go-gitea/gitea/pull/21862 - uploadVersionMutex.Lock() err := db.WithTx(ctx, func(ctx context.Context) error { created := true p := &packages_model.Package{ @@ -140,7 +150,6 @@ func getOrCreateUploadVersion(ctx context.Context, pi *packages_service.PackageI return nil }) - uploadVersionMutex.Unlock() return uploadVersion, err } @@ -172,10 +181,16 @@ func createFileForBlob(ctx context.Context, pv *packages_model.PackageVersion, p return nil } -func deleteBlob(ctx context.Context, ownerID int64, image, digest string) error { +func deleteBlob(ctx context.Context, owner *user_model.User, image, digest string) error { + // FIXME: Replace usage of mutex with database transaction + // https://github.com/go-gitea/gitea/pull/21862 + pkgPath := owner.LowerName + "/" + image + uploadVersionMutex.CheckIn(pkgPath) + defer uploadVersionMutex.CheckOut(pkgPath) + return db.WithTx(ctx, func(ctx context.Context) error { pfds, err := container_model.GetContainerBlobs(ctx, &container_model.BlobSearchOptions{ - OwnerID: ownerID, + OwnerID: owner.ID, Image: image, Digest: digest, }) diff --git a/routers/api/packages/container/container.go b/routers/api/packages/container/container.go index ddb8f6df9cbb4..ef9314d7e354a 100644 --- a/routers/api/packages/container/container.go +++ b/routers/api/packages/container/container.go @@ -24,6 +24,7 @@ import ( packages_module "code.gitea.io/gitea/modules/packages" container_module "code.gitea.io/gitea/modules/packages/container" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/sync" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/routers/api/packages/helper" auth_service "code.gitea.io/gitea/services/auth" @@ -540,7 +541,7 @@ func DeleteBlob(ctx *context.Context) { return } - if err := deleteBlob(ctx, ctx.Package.Owner.ID, ctx.Params("image"), d); err != nil { + if err := deleteBlob(ctx, ctx.Package.Owner, ctx.Params("image"), d); err != nil { apiError(ctx, http.StatusInternalServerError, err) return } @@ -550,6 +551,8 @@ func DeleteBlob(ctx *context.Context) { }) } +var lockManifest = sync.NewExclusivePool() + // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-manifests func UploadManifest(ctx *context.Context) { reference := ctx.Params("reference") @@ -581,6 +584,10 @@ func UploadManifest(ctx *context.Context) { return } + imagePath := ctx.Package.Owner.Name + "/" + ctx.Params("image") + lockManifest.CheckIn(imagePath) + defer lockManifest.CheckOut(imagePath) + digest, err := processManifest(ctx, mci, buf) if err != nil { var namedError *namedError @@ -679,6 +686,10 @@ func DeleteManifest(ctx *context.Context) { return } + imagePath := ctx.Package.Owner.Name + "/" + ctx.Params("image") + lockManifest.CheckIn(imagePath) + defer lockManifest.CheckOut(imagePath) + pvs, err := container_model.GetManifestVersions(ctx, opts) if err != nil { apiError(ctx, http.StatusInternalServerError, err) From 69d7727b01b8a451c7d393384b63ba58cf18edea Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Tue, 3 Sep 2024 11:17:14 -0700 Subject: [PATCH 2/2] Add comments and remove unnecessary comments --- routers/api/packages/container/blob.go | 7 +------ routers/api/packages/container/container.go | 1 + 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/routers/api/packages/container/blob.go b/routers/api/packages/container/blob.go index 095bc36f46d91..4e0ec50b9e808 100644 --- a/routers/api/packages/container/blob.go +++ b/routers/api/packages/container/blob.go @@ -23,13 +23,12 @@ import ( packages_service "code.gitea.io/gitea/services/packages" ) +// TODO: use clustered lock var uploadVersionMutex = sync.NewExclusivePool() // saveAsPackageBlob creates a package blob from an upload // The uploaded blob gets stored in a special upload version to link them to the package/image func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) { - // FIXME: Replace usage of mutex with database transaction - // https://github.com/go-gitea/gitea/pull/21862 pkgPath := pci.PackageInfo.Owner.LowerName + "/" + pci.PackageInfo.Name uploadVersionMutex.CheckIn(pkgPath) defer uploadVersionMutex.CheckOut(pkgPath) @@ -87,8 +86,6 @@ func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader // mountBlob mounts the specific blob to a different package func mountBlob(ctx context.Context, pi *packages_service.PackageInfo, pb *packages_model.PackageBlob) error { - // FIXME: Replace usage of mutex with database transaction - // https://github.com/go-gitea/gitea/pull/21862 pkgPath := pi.Owner.LowerName + "/" + pi.Name uploadVersionMutex.CheckIn(pkgPath) defer uploadVersionMutex.CheckOut(pkgPath) @@ -182,8 +179,6 @@ func createFileForBlob(ctx context.Context, pv *packages_model.PackageVersion, p } func deleteBlob(ctx context.Context, owner *user_model.User, image, digest string) error { - // FIXME: Replace usage of mutex with database transaction - // https://github.com/go-gitea/gitea/pull/21862 pkgPath := owner.LowerName + "/" + image uploadVersionMutex.CheckIn(pkgPath) defer uploadVersionMutex.CheckOut(pkgPath) diff --git a/routers/api/packages/container/container.go b/routers/api/packages/container/container.go index ef9314d7e354a..acdc513468669 100644 --- a/routers/api/packages/container/container.go +++ b/routers/api/packages/container/container.go @@ -551,6 +551,7 @@ func DeleteBlob(ctx *context.Context) { }) } +// TODO: use clustered lock var lockManifest = sync.NewExclusivePool() // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-manifests