-
-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Use global lock instead of NewExclusivePool to allow distributed lock between multiple Gitea instances #31813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
1c483f8
b98fb46
5141395
b0f6710
1748ba8
0c3c1a3
3d3bfaa
1276449
35daece
fe9efa9
4bf86e9
1e1d6e9
3e268b9
8e09e01
30dc681
860f3ec
aa51a8c
5df67a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
// Copyright 2023 The Gitea Authors. All rights reserved. | ||
// SPDX-License-Identifier: MIT | ||
|
||
package globallock | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"code.gitea.io/gitea/modules/nosql" | ||
"code.gitea.io/gitea/modules/setting" | ||
|
||
redsync "github.com/go-redsync/redsync/v4" | ||
goredis "github.com/go-redsync/redsync/v4/redis/goredis/v9" | ||
) | ||
|
||
type Locker interface { | ||
Lock() error // lock the resource and block until it is unlocked by the holder | ||
TryLock() (bool, error) // try to lock the resource and return immediately, first return value indicates if the lock was successful | ||
Unlock() (bool, error) // only lock with no error and TryLock returned true with no error can be unlocked | ||
} | ||
|
||
type LockService interface { | ||
GetLocker(name string) Locker // create or get a locker by name, RemoveLocker should be called after the locker is no longer needed | ||
RemoveLocker(name string) // remove a locker by name from the pool. This should be invoked affect locker is no longer needed, i.e. a pull request merged or closed | ||
} | ||
|
||
type memoryLock struct { | ||
mutex sync.Mutex | ||
} | ||
|
||
func (r *memoryLock) Lock() error { | ||
r.mutex.Lock() | ||
return nil | ||
} | ||
|
||
func (r *memoryLock) TryLock() (bool, error) { | ||
return r.mutex.TryLock(), nil | ||
} | ||
|
||
func (r *memoryLock) Unlock() (bool, error) { | ||
r.mutex.Unlock() | ||
return true, nil | ||
} | ||
|
||
var _ Locker = &memoryLock{} | ||
|
||
type memoryLockService struct { | ||
syncMap sync.Map | ||
} | ||
|
||
var _ LockService = &memoryLockService{} | ||
|
||
func newMemoryLockService() *memoryLockService { | ||
return &memoryLockService{ | ||
syncMap: sync.Map{}, | ||
} | ||
} | ||
|
||
func (l *memoryLockService) GetLocker(name string) Locker { | ||
v, _ := l.syncMap.LoadOrStore(name, &memoryLock{}) | ||
return v.(*memoryLock) | ||
} | ||
|
||
func (l *memoryLockService) RemoveLocker(name string) { | ||
l.syncMap.Delete(name) | ||
} | ||
|
||
type redisLockService struct { | ||
rs *redsync.Redsync | ||
} | ||
|
||
var _ LockService = &redisLockService{} | ||
|
||
func newRedisLockService(connection string) *redisLockService { | ||
client := nosql.GetManager().GetRedisClient(connection) | ||
|
||
pool := goredis.NewPool(client) | ||
|
||
// Create an instance of redisync to be used to obtain a mutual exclusion | ||
// lock. | ||
rs := redsync.New(pool) | ||
|
||
return &redisLockService{ | ||
rs: rs, | ||
} | ||
} | ||
|
||
type redisLock struct { | ||
mutex *redsync.Mutex | ||
} | ||
|
||
func (r *redisLockService) GetLocker(name string) Locker { | ||
return &redisLock{mutex: r.rs.NewMutex(name)} | ||
} | ||
|
||
func (r *redisLockService) RemoveLocker(name string) { | ||
// Do nothing | ||
} | ||
|
||
func (r *redisLock) Lock() error { | ||
return r.mutex.Lock() | ||
lunny marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func (r *redisLock) TryLock() (bool, error) { | ||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | ||
defer cancel() | ||
if err := r.mutex.LockContext(ctx); err != nil { | ||
return false, err | ||
} | ||
return true, nil | ||
} | ||
|
||
func (r *redisLock) Unlock() (bool, error) { | ||
return r.mutex.Unlock() | ||
} | ||
|
||
var ( | ||
syncOnce sync.Once | ||
lockService LockService | ||
) | ||
|
||
func getLockService() LockService { | ||
syncOnce.Do(func() { | ||
if setting.GlobalLock.ServiceType == "redis" { | ||
lockService = newRedisLockService(setting.GlobalLock.ServiceConnStr) | ||
} else { | ||
lockService = newMemoryLockService() | ||
} | ||
}) | ||
return lockService | ||
} | ||
|
||
func GetLocker(name string) Locker { | ||
return getLockService().GetLocker(name) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// Copyright 2024 The Gitea Authors. All rights reserved. | ||
// SPDX-License-Identifier: MIT | ||
|
||
package globallock | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func Test_Lock(t *testing.T) { | ||
locker1 := GetLocker("test2") | ||
assert.NoError(t, locker1.Lock()) | ||
unlocked, err := locker1.Unlock() | ||
assert.NoError(t, err) | ||
assert.True(t, unlocked) | ||
|
||
locker2 := GetLocker("test2") | ||
assert.NoError(t, locker2.Lock()) | ||
|
||
locked1, err1 := locker2.TryLock() | ||
assert.NoError(t, err1) | ||
assert.False(t, locked1) | ||
|
||
locker2.Unlock() | ||
|
||
locked2, err2 := locker2.TryLock() | ||
assert.NoError(t, err2) | ||
assert.True(t, locked2) | ||
|
||
locker2.Unlock() | ||
} | ||
|
||
func Test_Lock_Redis(t *testing.T) { | ||
if os.Getenv("CI") == "" { | ||
t.Skip("Skip test for local development") | ||
} | ||
|
||
lockService = newRedisLockService("redis://redis") | ||
|
||
redisPool := | ||
locker1 := GetLocker("test1") | ||
Check failure on line 43 in modules/globallock/lock_test.go
|
||
assert.NoError(t, locker1.Lock()) | ||
unlocked, err := locker1.Unlock() | ||
assert.NoError(t, err) | ||
assert.True(t, unlocked) | ||
|
||
locker2 := GetLocker("test1") | ||
assert.NoError(t, locker2.Lock()) | ||
|
||
locked1, err1 := locker2.TryLock() | ||
assert.NoError(t, err1) | ||
assert.False(t, locked1) | ||
|
||
locker2.Unlock() | ||
|
||
locked2, err2 := locker2.TryLock() | ||
assert.NoError(t, err2) | ||
assert.True(t, locked2) | ||
|
||
locker2.Unlock() | ||
|
||
redisPool.Close() | ||
} | ||
Check failure on line 65 in modules/globallock/lock_test.go
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// Copyright 2024 The Gitea Authors. All rights reserved. | ||
// SPDX-License-Identifier: MIT | ||
|
||
package setting | ||
|
||
import ( | ||
"code.gitea.io/gitea/modules/log" | ||
"code.gitea.io/gitea/modules/nosql" | ||
) | ||
|
||
// GlobalLock represents configuration of global lock | ||
var GlobalLock = struct { | ||
ServiceType string | ||
ServiceConnStr string | ||
}{ | ||
ServiceType: "memory", | ||
} | ||
|
||
func loadGlobalLockFrom(rootCfg ConfigProvider) { | ||
sec := rootCfg.Section("global_lock") | ||
GlobalLock.ServiceType = sec.Key("SERVICE_TYPE").MustString("memory") | ||
switch GlobalLock.ServiceType { | ||
case "memory": | ||
case "redis": | ||
connStr := sec.Key("SERVICE_CONN_STR").String() | ||
if connStr == "" { | ||
log.Fatal("SERVICE_CONN_STR is empty for redis") | ||
} | ||
u := nosql.ToRedisURI(connStr) | ||
if u == nil { | ||
log.Fatal("SERVICE_CONN_STR %s is not a valid redis connection string", connStr) | ||
} | ||
GlobalLock.ServiceConnStr = connStr | ||
default: | ||
log.Fatal("Unknown sync lock service type: %s", GlobalLock.ServiceType) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
// Copyright 2024 The Gitea Authors. All rights reserved. | ||
// SPDX-License-Identifier: MIT | ||
|
||
package setting | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestLoadGlobalLockConfig(t *testing.T) { | ||
t.Run("DefaultGlobalLockConfig", func(t *testing.T) { | ||
iniStr := `` | ||
cfg, err := NewConfigProviderFromData(iniStr) | ||
assert.NoError(t, err) | ||
|
||
loadGlobalLockFrom(cfg) | ||
assert.EqualValues(t, "memory", GlobalLock.ServiceType) | ||
}) | ||
|
||
t.Run("RedisGlobalLockConfig", func(t *testing.T) { | ||
iniStr := ` | ||
[global_lock] | ||
SERVICE_TYPE = redis | ||
SERVICE_CONN_STR = addrs=127.0.0.1:6379 db=0 | ||
` | ||
cfg, err := NewConfigProviderFromData(iniStr) | ||
assert.NoError(t, err) | ||
|
||
loadGlobalLockFrom(cfg) | ||
assert.EqualValues(t, "redis", GlobalLock.ServiceType) | ||
assert.EqualValues(t, "addrs=127.0.0.1:6379 db=0", GlobalLock.ServiceConnStr) | ||
}) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.