Compare commits

..

2 Commits

Author SHA256 Message Date
8fa732e675 common: fix three bugs in the timeline cache
All checks were successful
go-generate-check / go-generate-check (pull_request) Successful in 16s
Integration tests / t (pull_request) Successful in 10m56s
1. Double-check locking (TOCTOU)
   GetTimeline dropped the read lock, then acquired the write lock to
   fetch fresh data, but never re-checked the cache after the write lock
   was acquired.  A second goroutine could pass the stale-check under the
   read lock at the same time, resulting in both goroutines fetching the
   full timeline independently and overwriting each other's result.  Add
   a re-check immediately after acquiring the write lock so only the
   first writer fetches.

2. LastCachedTime used the wrong item
   The incremental fetch used data[0].Updated as the high-water mark for
   the Since filter, but data is sorted in descending Created order so
   data[0] is the most-recently-created entry, whose Updated timestamp
   may not be the latest across all cached items.  Switch to a linear
   scan over all items to find the true maximum Updated value, matching
   the fix in origin/fix/timeline-cache-race.

3. Cache not invalidated after RequestReviews / UnrequestReview
   After the bot called RequestReviews or UnrequestReview, the timeline
   cache for that PR was left hot for up to 5 seconds.  Any subsequent
   GetTimeline call within that window returned stale data that did not
   include the just-created review_requested / review_request_removed
   entry, causing FindMissingAndExtraReviewers to make decisions based on
   outdated reviewer state.  Call ResetTimelineCache on success in both
   methods so the next read always sees the mutation.
2026-03-10 21:50:29 +01:00
572e33111b workflow-pr: fix race conditions in event processing
All checks were successful
go-generate-check / go-generate-check (pull_request) Successful in 12s
Integration tests / t (pull_request) Successful in 11m31s
Three targeted fixes for the reviewer add/remove race condition:

1. Per-PR serialization lock (prLocks sync.Map)
   Both the RabbitMQ event loop and the ConsistencyCheckProcess goroutine
   converge on ProcesPullRequest with no mutual exclusion, allowing them
   to race on AssignReviewers/UnrequestReview for the same PR.  A
   buffered-channel-per-PR-key mutex serialises all processing for a
   given PR regardless of which goroutine triggers it.

2. Self-triggered pull_request_sync filter
   The bot's own pushes to prjgit branches cause Gitea to fire a
   pull_request_sync event back, which would re-run AssignReviewers on
   already-settled reviewer state, producing spurious add/remove cycles.
   Events where Sender.Username matches the bot's own account are now
   dropped early in ProcessFunc.

3. Requeue retry moved to background goroutine
   The updatePrjGitError_requeue path was sleeping 5 s inside the event
   loop goroutine (blocking all further event processing) then calling
   ProcessFunc recursively.  It now schedules the retry in a goroutine;
   the per-PR lock prevents the retry from racing with events that arrive
   during the sleep window.  The recursive counter is removed as it only
   guarded this path.
2026-03-10 16:24:05 +01:00
5 changed files with 78 additions and 123 deletions

View File

@@ -221,14 +221,11 @@ type Gitea interface {
GetPullRequests(org, project string) ([]*models.PullRequest, error)
GetCurrentUser() (*models.User, error)
SetCacheTTL(ttl time.Duration)
GetCacheTTL() time.Duration
}
type GiteaTransport struct {
transport *transport.Runtime
client *apiclient.GiteaAPI
cacheTTL time.Duration
}
func AllocateGiteaTransport(giteaUrl string) Gitea {
@@ -243,19 +240,10 @@ func AllocateGiteaTransport(giteaUrl string) Gitea {
r.transport.DefaultAuthentication = transport.BearerToken(giteaToken)
r.client = apiclient.New(r.transport, nil)
r.cacheTTL = time.Second * 5
return &r
}
func (gitea *GiteaTransport) SetCacheTTL(ttl time.Duration) {
gitea.cacheTTL = ttl
}
func (gitea *GiteaTransport) GetCacheTTL() time.Duration {
return gitea.cacheTTL
}
func (gitea *GiteaTransport) FetchMaintainershipFile(org, repo, branch string) ([]byte, string, error) {
return gitea.GetRepositoryFileContent(org, repo, branch, MaintainershipFile)
}
@@ -780,6 +768,10 @@ func (gitea *GiteaTransport) RequestReviews(pr *models.PullRequest, reviewers ..
return nil, fmt.Errorf("Cannot create pull request reviews: %w", err)
}
// Invalidate the timeline cache so the next GetTimeline call reflects
// the newly created review_requested entry.
gitea.ResetTimelineCache(pr.Base.Repo.Owner.UserName, pr.Base.Repo.Name, pr.Index)
return review.GetPayload(), nil
}
@@ -788,6 +780,13 @@ func (gitea *GiteaTransport) UnrequestReview(org, repo string, id int64, reviwer
repository.NewRepoDeletePullReviewRequestsParams().WithOwner(org).WithRepo(repo).WithIndex(id).WithBody(&models.PullReviewRequestOptions{
Reviewers: reviwers,
}), gitea.transport.DefaultAuthentication)
if err == nil {
// Invalidate the timeline cache so the next GetTimeline call reflects
// the newly created review_request_removed entry.
gitea.ResetTimelineCache(org, repo, id)
}
return err
}
@@ -873,24 +872,31 @@ func (gitea *GiteaTransport) GetTimeline(org, repo string, idx int64) ([]*models
prID := fmt.Sprintf("%s/%s!%d", org, repo, idx)
giteaTimelineCacheMutex.RLock()
TimelineCache, IsCached := giteaTimelineCache[prID]
var LastCachedTime strfmt.DateTime
if IsCached {
l := len(TimelineCache.data)
if l > 0 {
LastCachedTime = TimelineCache.data[0].Updated
}
// cache data
if TimelineCache.lastCheck.Add(gitea.cacheTTL).Compare(time.Now()) > 0 {
giteaTimelineCacheMutex.RUnlock()
return TimelineCache.data, nil
}
if IsCached && TimelineCache.lastCheck.Add(time.Second*5).Compare(time.Now()) > 0 {
giteaTimelineCacheMutex.RUnlock()
return TimelineCache.data, nil
}
giteaTimelineCacheMutex.RUnlock()
giteaTimelineCacheMutex.Lock()
defer giteaTimelineCacheMutex.Unlock()
// Re-read after acquiring the write lock: another goroutine may have
// already refreshed the cache while we were waiting.
TimelineCache, IsCached = giteaTimelineCache[prID]
if IsCached && TimelineCache.lastCheck.Add(time.Second*5).Compare(time.Now()) > 0 {
return TimelineCache.data, nil
}
// Find the highest Updated timestamp across all cached items so the
// incremental fetch picks up both new entries and modified ones.
var LastCachedTime strfmt.DateTime
for _, d := range TimelineCache.data {
if time.Time(d.Updated).Compare(time.Time(LastCachedTime)) > 0 {
LastCachedTime = d.Updated
}
}
for resCount > 0 {
opts := issue.NewIssueGetCommentsAndTimelineParams().WithOwner(org).WithRepo(repo).WithIndex(idx).WithPage(&page)
if !LastCachedTime.IsZero() {

View File

@@ -2718,44 +2718,6 @@ func (c *MockGiteaFetchMaintainershipFileCall) DoAndReturn(f func(string, string
return c
}
// GetCacheTTL mocks base method.
func (m *MockGitea) GetCacheTTL() time.Duration {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetCacheTTL")
ret0, _ := ret[0].(time.Duration)
return ret0
}
// GetCacheTTL indicates an expected call of GetCacheTTL.
func (mr *MockGiteaMockRecorder) GetCacheTTL() *MockGiteaGetCacheTTLCall {
mr.mock.ctrl.T.Helper()
call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCacheTTL", reflect.TypeOf((*MockGitea)(nil).GetCacheTTL))
return &MockGiteaGetCacheTTLCall{Call: call}
}
// MockGiteaGetCacheTTLCall wrap *gomock.Call
type MockGiteaGetCacheTTLCall struct {
*gomock.Call
}
// Return rewrite *gomock.Call.Return
func (c *MockGiteaGetCacheTTLCall) Return(arg0 time.Duration) *MockGiteaGetCacheTTLCall {
c.Call = c.Call.Return(arg0)
return c
}
// Do rewrite *gomock.Call.Do
func (c *MockGiteaGetCacheTTLCall) Do(f func() time.Duration) *MockGiteaGetCacheTTLCall {
c.Call = c.Call.Do(f)
return c
}
// DoAndReturn rewrite *gomock.Call.DoAndReturn
func (c *MockGiteaGetCacheTTLCall) DoAndReturn(f func() time.Duration) *MockGiteaGetCacheTTLCall {
c.Call = c.Call.DoAndReturn(f)
return c
}
// GetCommit mocks base method.
func (m *MockGitea) GetCommit(org, repo, sha string) (*models.Commit, error) {
m.ctrl.T.Helper()
@@ -3617,42 +3579,6 @@ func (c *MockGiteaResetTimelineCacheCall) DoAndReturn(f func(string, string, int
return c
}
// SetCacheTTL mocks base method.
func (m *MockGitea) SetCacheTTL(ttl time.Duration) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetCacheTTL", ttl)
}
// SetCacheTTL indicates an expected call of SetCacheTTL.
func (mr *MockGiteaMockRecorder) SetCacheTTL(ttl any) *MockGiteaSetCacheTTLCall {
mr.mock.ctrl.T.Helper()
call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCacheTTL", reflect.TypeOf((*MockGitea)(nil).SetCacheTTL), ttl)
return &MockGiteaSetCacheTTLCall{Call: call}
}
// MockGiteaSetCacheTTLCall wrap *gomock.Call
type MockGiteaSetCacheTTLCall struct {
*gomock.Call
}
// Return rewrite *gomock.Call.Return
func (c *MockGiteaSetCacheTTLCall) Return() *MockGiteaSetCacheTTLCall {
c.Call = c.Call.Return()
return c
}
// Do rewrite *gomock.Call.Do
func (c *MockGiteaSetCacheTTLCall) Do(f func(time.Duration)) *MockGiteaSetCacheTTLCall {
c.Call = c.Call.Do(f)
return c
}
// DoAndReturn rewrite *gomock.Call.DoAndReturn
func (c *MockGiteaSetCacheTTLCall) DoAndReturn(f func(time.Duration)) *MockGiteaSetCacheTTLCall {
c.Call = c.Call.DoAndReturn(f)
return c
}
// SetCommitStatus mocks base method.
func (m *MockGitea) SetCommitStatus(org, repo, hash string, status *models.CommitStatus) (*models.CommitStatus, error) {
m.ctrl.T.Helper()

View File

@@ -1249,9 +1249,6 @@ func main() {
}
gitea := common.AllocateGiteaTransport(GiteaUrl)
if PollInterval < gitea.GetCacheTTL() {
gitea.SetCacheTTL(PollInterval)
}
user, err := gitea.GetCurrentUser()
if err != nil {

View File

@@ -8,6 +8,7 @@ import (
"runtime/debug"
"slices"
"strings"
"sync"
"time"
"github.com/opentracing/opentracing-go/log"
@@ -628,9 +629,29 @@ func (pr *PRProcessor) Process(req *models.PullRequest) error {
return err
}
// prLocks serialises concurrent processing of the same PR.
// Both the RabbitMQ event loop and the consistency-checker goroutine call
// ProcesPullRequest; without this lock they can race on reviewer add/remove.
// Key format: "org/repo#num"
var prLocks sync.Map // map[string]chan struct{}
func prLockKey(pr *models.PullRequest) string {
return fmt.Sprintf("%s/%s#%d", pr.Base.Repo.Owner.UserName, pr.Base.Repo.Name, pr.Index)
}
func acquirePRLock(key string) chan struct{} {
v, _ := prLocks.LoadOrStore(key, make(chan struct{}, 1))
ch := v.(chan struct{})
ch <- struct{}{}
return ch
}
func releasePRLock(ch chan struct{}) {
<-ch
}
type RequestProcessor struct {
configuredRepos map[string][]*common.AutogitConfig
recursive int
}
func (w *RequestProcessor) Process(pr *models.PullRequest) error {
@@ -647,6 +668,9 @@ func ProcesPullRequest(pr *models.PullRequest, configs []*common.AutogitConfig)
return nil
}
lock := acquirePRLock(prLockKey(pr))
defer releasePRLock(lock)
PRProcessor, err := AllocatePRProcessor(pr, configs)
if err != nil {
log.Error(err)
@@ -663,17 +687,23 @@ func (w *RequestProcessor) ProcessFunc(request *common.Request) (err error) {
common.LogInfo("panic cought --- recovered")
common.LogError(string(debug.Stack()))
}
w.recursive--
}()
w.recursive++
if w.recursive > 3 {
common.LogError("Recursion limit reached... something is wrong with this PR?")
return nil
}
var pr *models.PullRequest
if req, ok := request.Data.(*common.PullRequestWebhookEvent); ok {
// Skip pull_request_sync events triggered by the bot's own pushes to
// prjgit branches. Those would re-run AssignReviewers immediately
// after the bot itself just set them, producing spurious add/remove
// cycles. Human-triggered syncs have a different sender and are still
// processed normally.
if request.Type == common.RequestType_PRSync && CurrentUser != nil &&
req.Sender.Username == CurrentUser.UserName {
common.LogDebug("Skipping self-triggered pull_request_sync from", req.Sender.Username,
"on", req.Pull_Request.Base.Repo.Owner.Username+"/"+req.Pull_Request.Base.Repo.Name,
"#", req.Pull_Request.Number)
return nil
}
pr, err = Gitea.GetPullRequest(req.Pull_Request.Base.Repo.Owner.Username, req.Pull_Request.Base.Repo.Name, req.Pull_Request.Number)
if err != nil {
common.LogError("Cannot find PR for issue:", req.Pull_Request.Base.Repo.Owner.Username, req.Pull_Request.Base.Repo.Name, req.Pull_Request.Number)
@@ -710,8 +740,16 @@ func (w *RequestProcessor) ProcessFunc(request *common.Request) (err error) {
common.LogError("*** Cannot find config for org:", pr.Base.Repo.Owner.UserName)
}
if err = ProcesPullRequest(pr, configs); err == updatePrjGitError_requeue {
time.Sleep(time.Second * 5)
return w.ProcessFunc(request)
// Retry after a delay in a background goroutine so the event loop is
// not blocked while we wait. The per-PR lock inside ProcesPullRequest
// ensures no other processing races with the retry.
go func() {
time.Sleep(time.Second * 5)
if err := ProcesPullRequest(pr, configs); err != nil {
common.LogError("requeue retry failed:", err)
}
}()
return nil
}
return err
}

View File

@@ -989,18 +989,6 @@ func TestProcessFunc(t *testing.T) {
}
})
t.Run("Recursion limit", func(t *testing.T) {
reqProc.recursive = 3
err := reqProc.ProcessFunc(&common.Request{})
if err != nil {
t.Errorf("Expected nil error on recursion limit, got %v", err)
}
if reqProc.recursive != 3 {
t.Errorf("Expected recursive to be 3, got %d", reqProc.recursive)
}
reqProc.recursive = 0 // Reset
})
t.Run("Invalid data format", func(t *testing.T) {
err := reqProc.ProcessFunc(&common.Request{Data: nil})
if err == nil || !strings.Contains(err.Error(), "Invalid data format") {