Skip to content

Commit 46015ce

Browse files
authored
Fix phase locking (#1230)
* add logs for 2 phase lock * add update log * add root in log * add update log * use update lock * add log in update repo * use save * use exec * log rows affected * use no key update * use repo update * cleanup * add defer * add defer
1 parent 9c40d91 commit 46015ce

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

code/go/0chain.net/blobbercore/allocation/repository.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (r *Repository) GetById(ctx context.Context, id string) (*Allocation, error
4747
}
4848

4949
alloc := &Allocation{}
50-
err = tx.Table(TableNameAllocation).Where(SQLWhereGetById, id).First(alloc).Error
50+
err = tx.Table(TableNameAllocation).Where(SQLWhereGetById, id).Take(alloc).Error
5151
if err != nil {
5252
return alloc, err
5353
}
@@ -73,7 +73,7 @@ func (r *Repository) GetByIdAndLock(ctx context.Context, id string) (*Allocation
7373
err = tx.Model(&Allocation{}).
7474
Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).
7575
Where("id=?", id).
76-
First(alloc).Error
76+
Take(alloc).Error
7777
if err != nil {
7878
return alloc, err
7979
}
@@ -99,7 +99,7 @@ func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (
9999
}
100100

101101
alloc := &Allocation{}
102-
err = tx.Table(TableNameAllocation).Where(SQLWhereGetByTx, txHash).First(alloc).Error
102+
err = tx.Table(TableNameAllocation).Where(SQLWhereGetByTx, txHash).Take(alloc).Error
103103
if err != nil {
104104
return alloc, err
105105
}
@@ -139,7 +139,7 @@ func (r *Repository) GetAllocationIds(ctx context.Context) []Res {
139139

140140
}
141141

142-
func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string) error {
142+
func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation) error {
143143
var tx = datastore.GetStore().GetTransaction(ctx)
144144
if tx == nil {
145145
logging.Logger.Panic("no transaction in the context")
@@ -151,9 +151,10 @@ func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, A
151151
}
152152
delete(cache, allocationID)
153153

154-
err = tx.Exec("UPDATE allocations SET latest_redeemed_write_marker=?,is_redeem_required=? WHERE id=?",
155-
AllocationRoot, false, allocationID).Error
156-
154+
allocationUpdates := make(map[string]interface{})
155+
allocationUpdates["latest_redeemed_write_marker"] = AllocationRoot
156+
allocationUpdates["is_redeem_required"] = false
157+
err = tx.Model(allocationObj).Updates(allocationUpdates).Error
157158
return err
158159
}
159160

code/go/0chain.net/blobbercore/handler/object_operation_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,6 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
480480
if err != nil {
481481
return nil, common.NewError("invalid_parameters", "Invalid allocation id passed."+err.Error())
482482
}
483-
484483
if allocationObj.FileOptions == 0 {
485484
return nil, common.NewError("immutable_allocation", "Cannot write to an immutable allocation")
486485
}
@@ -1407,6 +1406,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
14071406
Logger.Info("rollback_writemarker", zap.Any("writemarker", writemarkerEntity.WM))
14081407

14091408
alloc, err := allocation.Repo.GetByIdAndLock(c, allocationID)
1409+
Logger.Info("[rollback]Lock Allocation", zap.Bool("is_redeem_required", alloc.IsRedeemRequired), zap.String("allocation_root", alloc.AllocationRoot), zap.String("latest_wm_redeemed", alloc.LatestRedeemedWM))
14101410
if err != nil {
14111411
txn.Rollback()
14121412
return &result, common.NewError("allocation_read_error", "Error reading the allocation object")

code/go/0chain.net/blobbercore/writemarker/worker.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ func redeemWriteMarker(wm *WriteMarkerEntity) error {
7171
alloc, err := allocation.Repo.GetByIdAndLock(ctx, allocationID)
7272
if err != nil {
7373
logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err))
74-
go tryAgain(wm)
74+
if err != gorm.ErrRecordNotFound {
75+
go tryAgain(wm)
76+
}
7577
shouldRollback = true
7678
return err
7779
}
@@ -80,12 +82,12 @@ func redeemWriteMarker(wm *WriteMarkerEntity) error {
8082
logging.Logger.Info("Stale write marker. Allocation root mismatch",
8183
zap.Any("allocation", allocationID),
8284
zap.Any("wm", wm.WM.AllocationRoot), zap.Any("alloc_root", alloc.AllocationRoot))
85+
_ = wm.UpdateStatus(ctx, Rollbacked, "rollbacked", "")
86+
err = db.Commit().Error
8387
mut := GetLock(allocationID)
8488
if mut != nil {
8589
mut.Release(1)
8690
}
87-
_ = wm.UpdateStatus(ctx, Rollbacked, "rollbacked", "")
88-
err = db.Commit().Error
8991
return err
9092
}
9193

@@ -100,13 +102,13 @@ func redeemWriteMarker(wm *WriteMarkerEntity) error {
100102

101103
return err
102104
}
103-
mut := GetLock(allocationID)
104-
if mut != nil {
105-
mut.Release(1)
106-
}
107-
108-
err = allocation.Repo.UpdateAllocationRedeem(ctx, wm.WM.AllocationRoot, allocationID)
109-
105+
defer func() {
106+
mut := GetLock(allocationID)
107+
if mut != nil {
108+
mut.Release(1)
109+
}
110+
}()
111+
err = allocation.Repo.UpdateAllocationRedeem(ctx, wm.WM.AllocationRoot, allocationID, alloc)
110112
if err != nil {
111113
logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed",
112114
zap.Any("allocation", allocationID),

0 commit comments

Comments
 (0)