Migrate to prjgit based config

config now only has reference to org or prjgits and the rest
is defined in the "workflow.config" in the prjgit itself. This
allows the config to be updated in the project.
This commit is contained in:
2025-03-23 16:33:06 +01:00
parent 4a4113aad7
commit 940e5be2c1
7 changed files with 175 additions and 98 deletions

View File

@@ -58,16 +58,22 @@ func ReadConfigFile(filename string) (*ConfigFile, error) {
func ReadWorkflowConfig(gitea Gitea, git_project string) (*AutogitConfig, error) { func ReadWorkflowConfig(gitea Gitea, git_project string) (*AutogitConfig, error) {
hash := strings.Split(git_project, "#") hash := strings.Split(git_project, "#")
if len(hash) != 2 { branch := ""
return nil, fmt.Errorf("Missing branch information in projectgit: %s", git_project) if len(hash) > 1 {
branch = hash[1]
} }
a := strings.Split(hash[0], "/") a := strings.Split(hash[0], "/")
if len(a) != 2 { prjGitRepo := DefaultGitPrj
switch len(a) {
case 1:
case 2:
prjGitRepo = a[1]
default:
return nil, fmt.Errorf("Missing org/repo in projectgit: %s", git_project) return nil, fmt.Errorf("Missing org/repo in projectgit: %s", git_project)
} }
data, _, err := gitea.GetRepositoryFileContent(a[0], a[1], hash[1], "workflow.config") data, _, err := gitea.GetRepositoryFileContent(a[0], prjGitRepo, branch, "workflow.config")
if err != nil { if err != nil {
return nil, fmt.Errorf("Error fetching 'workflow.config': %w", err) return nil, fmt.Errorf("Error fetching 'workflow.config': %w", err)
} }
@@ -77,7 +83,14 @@ func ReadWorkflowConfig(gitea Gitea, git_project string) (*AutogitConfig, error)
return nil, fmt.Errorf("Error parsing config file: %w", err) return nil, fmt.Errorf("Error parsing config file: %w", err)
} }
config.GitProjectName = git_project config.GitProjectName = a[0] + "/" + prjGitRepo
if len(branch) > 0 {
config.GitProjectName = config.GitProjectName + "#" + branch
}
if len(config.Organization) < 1 {
config.Organization = a[0]
}
log.Println(config)
return &config, nil return &config, nil
} }
@@ -104,6 +117,18 @@ type AutogitConfig struct {
Reviewers []string // only used by `pr` workflow Reviewers []string // only used by `pr` workflow
} }
type AutogitConfigs []*AutogitConfig
func (configs AutogitConfigs) GetPrjGitConfig(org, repo, branch string) *AutogitConfig {
for _, c := range configs {
if c.Organization == org && c.Branch == branch {
return c
}
}
return nil
}
func ReadWorkflowConfigs(reader io.Reader) ([]*AutogitConfig, error) { func ReadWorkflowConfigs(reader io.Reader) ([]*AutogitConfig, error) {
data, err := io.ReadAll(reader) data, err := io.ReadAll(reader)
if err != nil { if err != nil {

View File

@@ -479,8 +479,11 @@ func (gitea *GiteaTransport) AddReviewComment(pr *models.PullRequest, state mode
} }
func (gitea *GiteaTransport) GetRepositoryFileContent(org, repo, hash, path string) ([]byte, string, error) { func (gitea *GiteaTransport) GetRepositoryFileContent(org, repo, hash, path string) ([]byte, string, error) {
content, err := gitea.client.Repository.RepoGetContents( params := repository.NewRepoGetContentsParams().WithOwner(org).WithRepo(repo).WithFilepath(path)
repository.NewRepoGetContentsParams().WithOwner(org).WithRepo(repo).WithFilepath(path).WithRef(&hash), if len(hash) > 0 {
params = params.WithRef(&hash)
}
content, err := gitea.client.Repository.RepoGetContents(params,
gitea.transport.DefaultAuthentication, gitea.transport.DefaultAuthentication,
) )

View File

@@ -59,27 +59,31 @@ type RequestProcessor interface {
} }
type ListenDefinitions struct { type ListenDefinitions struct {
RabbitURL string // amqps://user:password@host/queue RabbitURL *url.URL // amqps://user:password@host/queue
GitAuthor string GitAuthor string
Handlers map[string]RequestProcessor Handlers map[string]RequestProcessor
Orgs []string
topics []string
currentTopics []string
} }
type RabbitMessage rabbitmq.Delivery type RabbitMessage rabbitmq.Delivery
func processRabbitMQ(msgCh chan<- RabbitMessage, server url.URL, topics []string) error { func (l *ListenDefinitions) processRabbitMQ(msgCh chan<- RabbitMessage) error {
queueName := server.Path queueName := l.RabbitURL.Path
server.Path = "" l.RabbitURL.Path = ""
if len(queueName) > 0 && queueName[0] == '/' { if len(queueName) > 0 && queueName[0] == '/' {
queueName = queueName[1:] queueName = queueName[1:]
} }
connection, err := rabbitmq.DialTLS(server.String(), &tls.Config{ connection, err := rabbitmq.DialTLS(l.RabbitURL.String(), &tls.Config{
ServerName: server.Hostname(), ServerName: l.RabbitURL.Hostname(),
}) })
if err != nil { if err != nil {
return fmt.Errorf("Cannot connect to %s . Err: %w", server.Hostname(), err) return fmt.Errorf("Cannot connect to %s . Err: %w", l.RabbitURL.Hostname(), err)
} }
defer connection.Close() defer connection.Close()
@@ -124,7 +128,7 @@ func processRabbitMQ(msgCh chan<- RabbitMessage, server url.URL, topics []string
// log.Printf("queue: %s:%d", q.Name, q.Consumers) // log.Printf("queue: %s:%d", q.Name, q.Consumers)
log.Println(" -- listening to topics:") log.Println(" -- listening to topics:")
for _, topic := range topics { for _, topic := range l.topics {
err = ch.QueueBind(q.Name, topic, "pubsub", false, nil) err = ch.QueueBind(q.Name, topic, "pubsub", false, nil)
log.Println(" +", topic) log.Println(" +", topic)
if err != nil { if err != nil {
@@ -148,18 +152,18 @@ func processRabbitMQ(msgCh chan<- RabbitMessage, server url.URL, topics []string
} }
} }
func connectAndProcessRabbitMQ(log *log.Logger, ch chan<- RabbitMessage, server url.URL, topics []string) { func (l *ListenDefinitions) connectAndProcessRabbitMQ(log *log.Logger, ch chan<- RabbitMessage) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println(r) log.Println(r)
log.Println("'crash' RabbitMQ worker. Recovering... reconnecting...") log.Println("'crash' RabbitMQ worker. Recovering... reconnecting...")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
go connectAndProcessRabbitMQ(log, ch, server, topics) go l.connectAndProcessRabbitMQ(log, ch)
} }
}() }()
for { for {
err := processRabbitMQ(ch, server, topics) err := l.processRabbitMQ(ch)
if err != nil { if err != nil {
log.Printf("Error in RabbitMQ connection. %#v", err) log.Printf("Error in RabbitMQ connection. %#v", err)
log.Println("Reconnecting in 2 seconds...") log.Println("Reconnecting in 2 seconds...")
@@ -168,9 +172,9 @@ func connectAndProcessRabbitMQ(log *log.Logger, ch chan<- RabbitMessage, server
} }
} }
func connectToRabbitMQ(log *log.Logger, server url.URL, topics []string) chan RabbitMessage { func (l *ListenDefinitions) connectToRabbitMQ(log *log.Logger) chan RabbitMessage {
ch := make(chan RabbitMessage, 100) ch := make(chan RabbitMessage, 100)
go connectAndProcessRabbitMQ(log, ch, server, topics) go l.connectAndProcessRabbitMQ(log, ch)
return ch return ch
} }
@@ -192,33 +196,28 @@ func ProcessEvent(f RequestProcessor, request *Request) {
} }
func ProcessRabbitMQEvents(listenDefs ListenDefinitions, orgs []string) error { func (l *ListenDefinitions) ProcessRabbitMQEvents() error {
server, err := url.Parse(listenDefs.RabbitURL) log.Println("RabbitMQ connection:", l.RabbitURL.String())
if err != nil { l.topics = make([]string, 0, len(l.Handlers)*len(l.Orgs))
log.Panicf("cannot parse server URL. Err: %#v\n", err) log.Println(len(l.Handlers), len(l.Orgs))
}
log.Println("RabbitMQ connection:", *server) l.RabbitURL.User = url.UserPassword(rabbitUser, rabbitPassword)
topics := make([]string, 0, len(listenDefs.Handlers)*len(orgs))
log.Println(len(listenDefs.Handlers), len(orgs))
server.User = url.UserPassword(rabbitUser, rabbitPassword)
scope := "suse" scope := "suse"
if server.Hostname() == "rabbit.opensuse.org" { if l.RabbitURL.Hostname() == "rabbit.opensuse.org" {
scope = "opensuse" scope = "opensuse"
} }
for _, org := range orgs { for _, org := range l.Orgs {
for requestType, _ := range listenDefs.Handlers { for requestType, _ := range l.Handlers {
topics = append(topics, fmt.Sprintf("%s.src.%s.%s.#", scope, org, requestType)) l.topics = append(l.topics, fmt.Sprintf("%s.src.%s.%s.#", scope, org, requestType))
} }
} }
slices.Sort(topics) slices.Sort(l.topics)
topics = slices.Compact(topics) l.topics = slices.Compact(l.topics)
ch := connectToRabbitMQ(log.Default(), *server, topics) ch := l.connectToRabbitMQ(log.Default())
for { for {
msg, ok := <-ch msg, ok := <-ch
@@ -233,14 +232,14 @@ func ProcessRabbitMQEvents(listenDefs ListenDefinitions, orgs []string) error {
reqType := route[3] reqType := route[3]
org := route[2] org := route[2]
if !slices.Contains(orgs, org) { if !slices.Contains(l.Orgs, org) {
log.Println("Got even for unhandeled org:", org) log.Println("Got event for unhandeled org:", org)
continue continue
} }
log.Println("org:", org, "type:", reqType) log.Println("org:", org, "type:", reqType)
if handler, found := listenDefs.Handlers[reqType]; found { if handler, found := l.Handlers[reqType]; found {
/* h, err := CreateRequestHandler() /* h, err := CreateRequestHandler()
if err != nil { if err != nil {
log.Println("Cannot create request handler", err) log.Println("Cannot create request handler", err)
continue continue
@@ -252,7 +251,7 @@ func ProcessRabbitMQEvents(listenDefs ListenDefinitions, orgs []string) error {
continue continue
} else { } else {
log.Println("processing req", req.Type) log.Println("processing req", req.Type)
// h.Request = req // h.Request = req
ProcessEvent(handler, req) ProcessEvent(handler, req)
} }

View File

@@ -1,10 +1,13 @@
package main package main
import ( import (
"encoding/json"
"flag" "flag"
"log" "log"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings"
"time" "time"
"src.opensuse.org/autogits/common" "src.opensuse.org/autogits/common"
@@ -13,7 +16,21 @@ import (
var reviewer *models.User var reviewer *models.User
var groupName string var groupName string
var configs []*common.AutogitConfig var configs common.AutogitConfigs
type ReviewGroupMember struct {
Name string
}
func fetchReviewGroupConfig(gitea common.Gitea, org, repo, branch, groupName string) (reviewers []ReviewGroupMember, err error) {
data, _, err := gitea.GetRepositoryFileContent(org, repo, branch, groupName+".review.group")
if err != nil {
return nil, err
}
err = json.Unmarshal(data, &reviewers)
return
}
func processNotifications(notification *models.NotificationThread, gitea common.Gitea) { func processNotifications(notification *models.NotificationThread, gitea common.Gitea) {
rx := regexp.MustCompile(`^https://src\.(?:open)?suse\.(?:org|de)/api/v\d+/repos/(?<org>[a-zA-Z0-9]+)/(?<project>[_a-zA-Z0-9]+)/issues/(?<num>[0-9]+)$`) rx := regexp.MustCompile(`^https://src\.(?:open)?suse\.(?:org|de)/api/v\d+/repos/(?<org>[a-zA-Z0-9]+)/(?<project>[_a-zA-Z0-9]+)/issues/(?<num>[0-9]+)$`)
@@ -38,6 +55,8 @@ func processNotifications(notification *models.NotificationThread, gitea common.
return return
} }
config := configs.GetPrjGitConfig(org, repo, pr.Base.Name)
log.Println("PR state:", pr.State) log.Println("PR state:", pr.State)
if pr.State == "closed" { if pr.State == "closed" {
// dismiss the review // dismiss the review
@@ -52,7 +71,7 @@ func processNotifications(notification *models.NotificationThread, gitea common.
return return
} }
prs, err := common.FetchPRSet(gitea, org, repo, id) prs, err := common.FetchPRSet(gitea, org, repo, id, config)
if err != nil { if err != nil {
log.Printf("Cannot fetch PRSet for %s/%s/%d. Error: %v\n", org, repo, id, err) log.Printf("Cannot fetch PRSet for %s/%s/%d. Error: %v\n", org, repo, id, err)
return return
@@ -64,9 +83,29 @@ func processNotifications(notification *models.NotificationThread, gitea common.
return return
} }
fetchReviewGroupConfig(prjGitPR.Base.Repo, prjGitPR.Base.Sha, groupName) groupMembers, err := fetchReviewGroupConfig(gitea, prjGitPR.Base.Repo.Owner.UserName, prjGitPR.Base.Repo.Name, prjGitPR.Base.Sha, groupName)
if err != nil {
log.Println("Cannot fetch ReviewGroup definition:", groupName, err)
}
for _, review := range reviews { for _, review := range reviews {
user := ""
if !review.Stale &&
review.State == common.ReviewStateApproved &&
slices.ContainsFunc(groupMembers, func(g ReviewGroupMember) bool {
if g.Name == review.User.UserName {
user = g.Name
return true
}
return false
}) &&
strings.Contains(review.Body, "/"+groupName+" LGTM\n") {
gitea.AddReviewComment(pr, common.ReviewStateApproved, "Signed off by: "+user)
if err := gitea.SetNotificationRead(notification.ID); err != nil {
log.Println("Cannot set notification as read", err)
}
}
} }
} }
@@ -96,12 +135,14 @@ func main() {
log.Println(" group-review [OPTIONS] <review-group-name>") log.Println(" group-review [OPTIONS] <review-group-name>")
log.Println() log.Println()
flag.Usage() flag.Usage()
return
} }
groupName = args[0] groupName = args[0]
config, err := common.ReadConfigFile(*configFile) configData, err := common.ReadConfigFile(*configFile)
if err != nil { if err != nil {
log.Panicln("Failed to read config file") log.Println("Failed to read config file", err)
return
} }
if err := common.RequireGiteaSecretToken(); err != nil { if err := common.RequireGiteaSecretToken(); err != nil {
@@ -113,9 +154,11 @@ func main() {
} }
gitea := common.AllocateGiteaTransport(*giteaHost) gitea := common.AllocateGiteaTransport(*giteaHost)
configs = common.ResolveWorkflowConfigs(gitea, config) configs, err = common.ResolveWorkflowConfigs(gitea, configData)
if err != nil {
log.Panicln(err)
}
var err error
reviewer, err = gitea.GetCurrentUser() reviewer, err = gitea.GetCurrentUser()
if err != nil { if err != nil {
log.Panicln("Cannot fetch review user: %w", err) log.Panicln("Cannot fetch review user: %w", err)
@@ -125,10 +168,16 @@ func main() {
*interval = 1 *interval = 1
} }
log.Println(" ** processing group reviews for group:", reviewer.UserName) log.Println(" ** processing group reviews for group:", groupName)
log.Println(" ** username in Gitea:", reviewer.UserName)
log.Println(" ** polling internval:", *interval, "min") log.Println(" ** polling internval:", *interval, "min")
log.Println(" ** connecting to RabbitMQ:", *rabbitMqHost) log.Println(" ** connecting to RabbitMQ:", *rabbitMqHost)
if groupName != reviewer.UserName {
log.Println(" ***** Reviewer does not match group name. Aborting. *****")
return
}
for { for {
periodReviewCheck(gitea) periodReviewCheck(gitea)
time.Sleep(time.Duration(*interval * int64(time.Minute))) time.Sleep(time.Duration(*interval * int64(time.Minute)))

View File

@@ -1,14 +1,6 @@
[ [
{ "autogits/MyPrj",
"Workflows": ["direct"], "autogits/HiddenPrj",
"Organization": "autogits", "testing"
"GitProjectName": "MyPrj"
},
{
"Workflows": ["direct"],
"Organization": "autogits",
"GitProjectName": "HiddenPrj",
"Branch": "hidden"
}
] ]

View File

@@ -1,6 +1,8 @@
module src.opensuse.org/autogits/workflow-direct module src.opensuse.org/autogits/workflow-direct
go 1.22.3 go 1.23.1
toolchain go1.24.0
replace src.opensuse.org/autogits/common => ../bots-common replace src.opensuse.org/autogits/common => ../bots-common

View File

@@ -25,6 +25,7 @@ import (
"io/fs" "io/fs"
"log" "log"
"math/rand" "math/rand"
"net/url"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@@ -248,7 +249,7 @@ func verifyProjectState(git common.Git, org string, config *common.AutogitConfig
if data, err := os.ReadFile(path.Join(git.GetPath(), config.GitProjectName, common.PrjLinksFile)); err == nil { if data, err := os.ReadFile(path.Join(git.GetPath(), config.GitProjectName, common.PrjLinksFile)); err == nil {
pkgLinks, err = parseProjectLinks(data) pkgLinks, err = parseProjectLinks(data)
if err != nil { if err != nil {
log.Println("Cannot parse project links file: %s", err.Error()) log.Println("Cannot parse project links file:", err.Error())
pkgLinks = nil pkgLinks = nil
} else { } else {
ResolveLinks(org, pkgLinks, gitea) ResolveLinks(org, pkgLinks, gitea)
@@ -459,8 +460,35 @@ func consistencyCheckProcess() {
var DebugMode bool var DebugMode bool
func updateConfiguration(configFilename string, orgs *[]string) {
configFile, err := common.ReadConfigFile(configFilename)
if err != nil {
log.Fatal(err)
}
configs, _ := common.ResolveWorkflowConfigs(gitea, configFile)
configuredRepos = make(map[string][]*common.AutogitConfig)
*orgs = make([]string, 0, 1)
for _, c := range configs {
if slices.Contains(c.Workflows, "direct") {
if DebugMode {
log.Printf(" + adding org: '%s', branch: '%s', prjgit: '%s'\n", c.Organization, c.Branch, c.GitProjectName)
}
configs := configuredRepos[c.Organization]
if configs == nil {
configs = make([]*common.AutogitConfig, 0, 1)
}
configs = append(configs, c)
configuredRepos[c.Organization] = configs
*orgs = append(*orgs, c.Organization)
}
}
}
func main() { func main() {
workflowConfig := flag.String("config", "", "Repository and workflow definition file") configFilename := flag.String("config", "", "List of PrjGit")
giteaHost := flag.String("gitea", "src.opensuse.org", "Gitea instance") giteaHost := flag.String("gitea", "src.opensuse.org", "Gitea instance")
rabbitUrl := flag.String("url", "amqps://rabbit.opensuse.org", "URL for RabbitMQ instance") rabbitUrl := flag.String("url", "amqps://rabbit.opensuse.org", "URL for RabbitMQ instance")
flag.BoolVar(&DebugMode, "debug", false, "Extra debugging information") flag.BoolVar(&DebugMode, "debug", false, "Extra debugging information")
@@ -477,49 +505,28 @@ func main() {
checkInterval = time.Duration(*checkIntervalHours) * time.Hour checkInterval = time.Duration(*checkIntervalHours) * time.Hour
if len(*workflowConfig) == 0 {
log.Fatalln("No configuratio file specified. Aborting")
}
configs, err := common.ReadWorkflowConfigsFile(*workflowConfig)
if err != nil {
log.Fatal(err)
}
configuredRepos = make(map[string][]*common.AutogitConfig)
orgs := make([]string, 0, 1)
for _, c := range configs {
if slices.Contains(c.Workflows, "direct") {
if DebugMode {
log.Printf(" + adding org: '%s', branch: '%s', prjgit: '%s'\n", c.Organization, c.Branch, c.GitProjectName)
}
configs := configuredRepos[c.Organization]
if configs == nil {
configs = make([]*common.AutogitConfig, 0, 1)
}
configs = append(configs, c)
configuredRepos[c.Organization] = configs
orgs = append(orgs, c.Organization)
}
}
gitea = common.AllocateGiteaTransport(*giteaHost) gitea = common.AllocateGiteaTransport(*giteaHost)
CurrentUser, err := gitea.GetCurrentUser() CurrentUser, err := gitea.GetCurrentUser()
if err != nil { if err != nil {
log.Fatalln("Cannot fetch current user:", err) log.Fatalln("Cannot fetch current user:", err)
} }
log.Println("Current User:", CurrentUser.UserName) log.Println("Current User:", CurrentUser.UserName)
go consistencyCheckProcess()
var defs common.ListenDefinitions var defs common.ListenDefinitions
updateConfiguration(*configFilename, &defs.Orgs)
defs.GitAuthor = GitAuthor defs.GitAuthor = GitAuthor
defs.RabbitURL = *rabbitUrl defs.RabbitURL, err = url.Parse(*rabbitUrl)
if err != nil {
log.Panicf("cannot parse server URL. Err: %#v\n", err)
}
go consistencyCheckProcess()
log.Println("defs:", defs)
defs.Handlers = make(map[string]common.RequestProcessor) defs.Handlers = make(map[string]common.RequestProcessor)
defs.Handlers[common.RequestType_Push] = &PushActionProcessor{} defs.Handlers[common.RequestType_Push] = &PushActionProcessor{}
defs.Handlers[common.RequestType_Repository] = &RepositoryActionProcessor{} defs.Handlers[common.RequestType_Repository] = &RepositoryActionProcessor{}
log.Fatal(common.ProcessRabbitMQEvents(defs, orgs)) log.Fatal(defs.ProcessRabbitMQEvents())
} }