workflow-pr: fix race conditions in event processing
Three targeted fixes for the reviewer add/remove race condition: 1. Per-PR serialization lock (prLocks sync.Map) Both the RabbitMQ event loop and the ConsistencyCheckProcess goroutine converge on ProcesPullRequest with no mutual exclusion, allowing them to race on AssignReviewers/UnrequestReview for the same PR. A buffered-channel-per-PR-key mutex serialises all processing for a given PR regardless of which goroutine triggers it. 2. Self-triggered pull_request_sync filter The bot's own pushes to prjgit branches cause Gitea to fire a pull_request_sync event back, which would re-run AssignReviewers on already-settled reviewer state, producing spurious add/remove cycles. Events where Sender.Username matches the bot's own account are now dropped early in ProcessFunc. 3. Requeue retry moved to background goroutine The updatePrjGitError_requeue path was sleeping 5 s inside the event loop goroutine (blocking all further event processing) then calling ProcessFunc recursively. It now schedules the retry in a goroutine; the per-PR lock prevents the retry from racing with events that arrive during the sleep window. The recursive counter is removed as it only guarded this path.
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/opentracing/opentracing-go/log"
|
||||
@@ -628,9 +629,29 @@ func (pr *PRProcessor) Process(req *models.PullRequest) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// prLocks serialises concurrent processing of the same PR.
|
||||
// Both the RabbitMQ event loop and the consistency-checker goroutine call
|
||||
// ProcesPullRequest; without this lock they can race on reviewer add/remove.
|
||||
// Key format: "org/repo#num"
|
||||
var prLocks sync.Map // map[string]chan struct{}
|
||||
|
||||
func prLockKey(pr *models.PullRequest) string {
|
||||
return fmt.Sprintf("%s/%s#%d", pr.Base.Repo.Owner.UserName, pr.Base.Repo.Name, pr.Index)
|
||||
}
|
||||
|
||||
func acquirePRLock(key string) chan struct{} {
|
||||
v, _ := prLocks.LoadOrStore(key, make(chan struct{}, 1))
|
||||
ch := v.(chan struct{})
|
||||
ch <- struct{}{}
|
||||
return ch
|
||||
}
|
||||
|
||||
func releasePRLock(ch chan struct{}) {
|
||||
<-ch
|
||||
}
|
||||
|
||||
type RequestProcessor struct {
|
||||
configuredRepos map[string][]*common.AutogitConfig
|
||||
recursive int
|
||||
}
|
||||
|
||||
func (w *RequestProcessor) Process(pr *models.PullRequest) error {
|
||||
@@ -647,6 +668,9 @@ func ProcesPullRequest(pr *models.PullRequest, configs []*common.AutogitConfig)
|
||||
return nil
|
||||
}
|
||||
|
||||
lock := acquirePRLock(prLockKey(pr))
|
||||
defer releasePRLock(lock)
|
||||
|
||||
PRProcessor, err := AllocatePRProcessor(pr, configs)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
@@ -663,17 +687,23 @@ func (w *RequestProcessor) ProcessFunc(request *common.Request) (err error) {
|
||||
common.LogInfo("panic cought --- recovered")
|
||||
common.LogError(string(debug.Stack()))
|
||||
}
|
||||
w.recursive--
|
||||
}()
|
||||
|
||||
w.recursive++
|
||||
if w.recursive > 3 {
|
||||
common.LogError("Recursion limit reached... something is wrong with this PR?")
|
||||
return nil
|
||||
}
|
||||
|
||||
var pr *models.PullRequest
|
||||
if req, ok := request.Data.(*common.PullRequestWebhookEvent); ok {
|
||||
// Skip pull_request_sync events triggered by the bot's own pushes to
|
||||
// prjgit branches. Those would re-run AssignReviewers immediately
|
||||
// after the bot itself just set them, producing spurious add/remove
|
||||
// cycles. Human-triggered syncs have a different sender and are still
|
||||
// processed normally.
|
||||
if request.Type == common.RequestType_PRSync && CurrentUser != nil &&
|
||||
req.Sender.Username == CurrentUser.UserName {
|
||||
common.LogDebug("Skipping self-triggered pull_request_sync from", req.Sender.Username,
|
||||
"on", req.Pull_Request.Base.Repo.Owner.Username+"/"+req.Pull_Request.Base.Repo.Name,
|
||||
"#", req.Pull_Request.Number)
|
||||
return nil
|
||||
}
|
||||
|
||||
pr, err = Gitea.GetPullRequest(req.Pull_Request.Base.Repo.Owner.Username, req.Pull_Request.Base.Repo.Name, req.Pull_Request.Number)
|
||||
if err != nil {
|
||||
common.LogError("Cannot find PR for issue:", req.Pull_Request.Base.Repo.Owner.Username, req.Pull_Request.Base.Repo.Name, req.Pull_Request.Number)
|
||||
@@ -710,8 +740,16 @@ func (w *RequestProcessor) ProcessFunc(request *common.Request) (err error) {
|
||||
common.LogError("*** Cannot find config for org:", pr.Base.Repo.Owner.UserName)
|
||||
}
|
||||
if err = ProcesPullRequest(pr, configs); err == updatePrjGitError_requeue {
|
||||
time.Sleep(time.Second * 5)
|
||||
return w.ProcessFunc(request)
|
||||
// Retry after a delay in a background goroutine so the event loop is
|
||||
// not blocked while we wait. The per-PR lock inside ProcesPullRequest
|
||||
// ensures no other processing races with the retry.
|
||||
go func() {
|
||||
time.Sleep(time.Second * 5)
|
||||
if err := ProcesPullRequest(pr, configs); err != nil {
|
||||
common.LogError("requeue retry failed:", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -989,18 +989,6 @@ func TestProcessFunc(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Recursion limit", func(t *testing.T) {
|
||||
reqProc.recursive = 3
|
||||
err := reqProc.ProcessFunc(&common.Request{})
|
||||
if err != nil {
|
||||
t.Errorf("Expected nil error on recursion limit, got %v", err)
|
||||
}
|
||||
if reqProc.recursive != 3 {
|
||||
t.Errorf("Expected recursive to be 3, got %d", reqProc.recursive)
|
||||
}
|
||||
reqProc.recursive = 0 // Reset
|
||||
})
|
||||
|
||||
t.Run("Invalid data format", func(t *testing.T) {
|
||||
err := reqProc.ProcessFunc(&common.Request{Data: nil})
|
||||
if err == nil || !strings.Contains(err.Error(), "Invalid data format") {
|
||||
|
||||
Reference in New Issue
Block a user