This commit is contained in:
Adam Majer 2024-08-28 00:45:47 +02:00
parent 2be5ae7150
commit e125031384
14 changed files with 199 additions and 257 deletions

View File

@ -13,6 +13,7 @@ type AutogitConfig struct {
Workflows []string
Organization string
GitProjectName string
Branch string
}
func ReadWorkflowConfigs(reader io.Reader) ([]*AutogitConfig, error) {
@ -53,4 +54,3 @@ func ReadWorkflowConfigsFile(filename string) ([]*AutogitConfig, error) {
return ReadWorkflowConfigs(file)
}

View File

@ -4,8 +4,6 @@ import (
"crypto/tls"
"fmt"
"log"
"maps"
"net/http"
"net/url"
"strings"
"time"
@ -144,126 +142,58 @@ func connectAndProcessRabbitMQ(log *log.Logger, ch chan<- RabbitMessage, server
}
}
func ConnectToRabbitMQ(log *log.Logger, server url.URL, topics []string) chan RabbitMessage {
func connectToRabbitMQ(log *log.Logger, server url.URL, topics []string) chan RabbitMessage {
ch := make(chan RabbitMessage, 100)
go connectAndProcessRabbitMQ(log, ch, server, topics)
return ch
}
func ProcessRabbitMQEvents(listenDefs ListenDefinitions) {
func ProcessEvent(f RequestProcessor, h *RequestHandler) {
defer func(){
recover()
}()
f(h)
}
func ProcessRabbitMQEvents(listenDefs ListenDefinitions, orgs []string) error {
server, err := url.Parse(listenDefs.RabbitURL)
if err != nil {
log.Panicf("cannot parse server URL. Err: %#v", err)
}
topics := make([]string, 0, len(listenDefs.Handlers))
for k := range listenDefs.Handlers {
topics = append(topics, fmt.Sprintf("*.gitea.%s#", k))
topics := make([]string, 0, len(listenDefs.Handlers)*len(orgs))
for _, org := range orgs {
for k := range listenDefs.Handlers {
topics = append(topics, fmt.Sprintf("*suse.gitea.%s.%s#", org, k))
}
}
ch := ConnectToRabbitMQ(log.Default(), *server, topics)
ch := connectToRabbitMQ(log.Default(), *server, topics)
for {
msg, ok := <-ch
if !ok {
return
return nil
}
route := strings.Split(msg.RoutingKey, ".")
if len(route > 3) {
if handler, found := listenDefs.Handlers[route[2]]; found {
switch route[2] {
case RequestType_CreateBrachTag, RequestType_DeleteBranchTag:
case RequestType_Fork:
case RequestType_Issue:
case RequestType_IssueAssign:
case RequestType_IssueComment:
case RequestType_IssueLabel:
case RequestType_IssueMilestone:
case RequestType_Push:
case RequestType_Repository:
case RequestType_Release:
case RequestType_PR:
case RequestType_PRAssign:
case RequestType_PRLabel:
case RequestType_PRComment:
case RequestType_PRMilestone:
case RequestType_PRSync:
case RequestType_PRReviewAccepted:
case RequestType_PRReviewRejected:
case RequestType_PRReviewRequest:
case RequestType_Wiki:
if len(route) > 3 {
reqType := route[3]
org := route[2]
if handler, found := listenDefs.Handlers[org]; found {
h := CreateRequestHandler(listenDefs.GitAuthor, listenDefs.GitAuthor)
req, err := ParseRequestJSON(reqType, msg.Body)
if err != nil {
} else {
h.Request = req
ProcessEvent(handler, h)
}
handler
}
}
}
}
func StartServer(listenDefs ListenDefinitions, config []*AutogitConfig) {
}
func StartServerWithAddress(listenDefs ListenDefinitions, addr string) {
if listenDefs.Url != url.PathEscape(listenDefs.Url) {
log.Fatalf("Invalid Url fragment (%s) to listen on. Aborting", listenDefs.Url)
}
http.HandleFunc("/"+listenDefs.Url, func(res http.ResponseWriter, req *http.Request) {
h := CreateRequestHandler(listenDefs.GitAuthor, listenDefs.Url)
defer h.Close()
hdr := req.Header[GiteaRequestHeader]
if len(hdr) != 1 {
h.ErrLogger.Printf("Unsupported number of %s headers: %d: %#v\n", GiteaRequestHeader, len(hdr), hdr)
h.WriteError()
res.WriteHeader(http.StatusInternalServerError)
return
}
reqType := hdr[0]
if handler, ok := listenDefs.Handlers[reqType]; ok {
switch reqType {
case RequestType_Repository:
h.parseRepositoryRequest(req.Body)
case RequestType_Push:
h.parsePushRequest(req.Body)
case RequestType_PR:
h.parsePullRequest(req.Body)
case RequestType_PRSync:
h.parsePullRequestSync(req.Body)
default:
h.ErrLogger.Printf("Unhandled request type: %s\n", reqType)
res.WriteHeader(http.StatusInternalServerError)
return
}
if h.HasError() {
h.ErrLogger.Printf("error in parser %s: %v\n", reqType, h.Error)
res.WriteHeader(http.StatusInternalServerError)
return
}
if err := handler(h); err != nil {
h.ErrLogger.Printf("error in handler for %s: %v\n", reqType, err)
res.WriteHeader(http.StatusInternalServerError)
return
}
} else {
h.ErrLogger.Printf("Unsupported request type: %s\n", reqType)
res.WriteHeader(http.StatusInternalServerError)
return
}
if h.HasError() {
h.WriteError()
res.WriteHeader(http.StatusInternalServerError)
return
}
res.Header().Add("Content-Type", "application/json")
res.WriteHeader(http.StatusOK)
})
log.Fatal(http.ListenAndServe(addr, nil))
}

View File

@ -8,3 +8,8 @@ type CreateWebhookEvent struct {
Repository *Repository
Sender *User
}
func (c *CreateWebhookEvent) GetAction() string {
return c.Ref_type
}

View File

@ -5,3 +5,8 @@ type ForkWebhookEvent struct {
Repository *Repository
Sender *User
}
func (*ForkWebhookEvent) GetAction() string {
return "fork"
}

View File

@ -7,154 +7,77 @@ import (
"os"
)
type RequestType interface {
GetAction() string
}
type Request struct {
Type string
Data interface{}
PrjGit string
Data RequestType
}
func ParseRequestJSON(reqType string, data []byte) (interface{}, error) {
switch reqType {
case RequestType_CreateBrachTag, RequestType_DeleteBranchTag:
create := CreateWebhookEvent{}
if err := json.Unmarshal(data, &create); err != nil {
return nil, err
}
return create, nil
case RequestType_Fork:
fork := ForkWebhookEvent{}
if err := json.Unmarshal(data, &fork); err != nil {
return nil, err
}
return fork, nil
case RequestType_Push:
push := PushRequest{}
if err := json.Unmarshal(data, &push); err != nil {
return nil, err
}
return push, nil
case RequestType_Repository:
repoAction := RepositoryAction{}
if err := json.Unmarshal(data, &repoAction); err != nil {
return nil, err
}
return repoAction, nil
case RequestType_Release:
release := ReleaseWebhookEvent{}
if err := json.Unmarshal(data, &release); err != nil {
return nil, err
}
return release, nil
case RequestType_Issue:
issue := IssueWebhookEvent{}
if err := json.Unmarshal(data, &issue); err != nil {
return nil, err
}
return issue, nil
case RequestType_IssueAssign:
issue := IssueWebhookEvent{}
if err := json.Unmarshal(data, &issue); err != nil {
return nil, err
}
return issue, nil
case RequestType_IssueComment, RequestType_PRComment:
issue := IssueCommentWebhookEvent{}
if err := json.Unmarshal(data, &issue); err != nil {
return nil, err
}
return issue, nil
case RequestType_IssueLabel:
issue := IssueWebhookEvent{}
if err := json.Unmarshal(data, &issue); err != nil {
return nil, err
}
return issue, nil
case RequestType_IssueMilestone:
issue := IssueWebhookEvent{}
if err := json.Unmarshal(data, &issue); err != nil {
return nil, err
}
return issue, nil
case RequestType_PR:
pr := PullRequestWebhookEvent{}
if err := json.Unmarshal(data, &pr); err != nil {
return nil, err
}
return pr, nil
case RequestType_PRLabel:
pr := PullRequestWebhookEvent{}
if err := json.Unmarshal(data, &pr); err != nil {
return nil, err
}
return pr, nil
case RequestType_PRMilestone:
pr := PullRequestWebhookEvent{}
if err := json.Unmarshal(data, &pr); err != nil {
return nil, err
}
return pr, nil
case RequestType_PRAssign:
issue := PullRequestWebhookEvent{}
if err := json.Unmarshal(data, &issue); err != nil {
return nil, err
}
return issue, nil
case RequestType_PRReviewRequest:
issue := PullRequestWebhookEvent{}
if err := json.Unmarshal(data, &issue); err != nil {
return nil, err
}
return issue, nil
case RequestType_PRReviewAccepted, RequestType_PRReviewRejected:
pr := PullRequestWebhookEvent{}
if err := json.Unmarshal(data, &pr); err != nil {
return nil, err
}
return pr, nil
case RequestType_PRSync:
pr := PullRequestWebhookEvent{}
if err := json.Unmarshal(data, &pr); err != nil {
return nil, err
}
return pr, nil
case RequestType_Wiki:
wiki := WikiWebhookEvent{}
if err := json.Unmarshal(data, &wiki); err != nil {
return nil, err
}
return wiki, nil
func ParseRequestJSON(reqType string, data []byte) (req *Request, err error) {
req = &Request{
Type: reqType,
}
return nil, fmt.Errorf("Unknown webhook request type: %s", reqType)
}
switch reqType {
case RequestType_CreateBrachTag, RequestType_DeleteBranchTag:
req.Data = &CreateWebhookEvent{}
case RequestType_Fork:
req.Data = &ForkWebhookEvent{}
case RequestType_Push:
req.Data = &PushWebhookEvent{}
case RequestType_Repository:
req.Data = &RepositoryWebhookEvent{}
case RequestType_Release:
req.Data = &ReleaseWebhookEvent{}
case RequestType_Issue:
req.Data = &IssueWebhookEvent{}
case RequestType_IssueAssign:
req.Data = &IssueWebhookEvent{}
case RequestType_IssueComment, RequestType_PRComment:
req.Data = &IssueCommentWebhookEvent{}
case RequestType_IssueLabel:
req.Data = &IssueWebhookEvent{}
case RequestType_IssueMilestone:
req.Data = &IssueWebhookEvent{}
case RequestType_PR:
req.Data = &PullRequestWebhookEvent{}
case RequestType_PRLabel:
req.Data = &PullRequestWebhookEvent{}
case RequestType_PRMilestone:
req.Data = &PullRequestWebhookEvent{}
case RequestType_PRAssign:
req.Data = &PullRequestWebhookEvent{}
case RequestType_PRReviewRequest:
req.Data = &PullRequestWebhookEvent{}
case RequestType_PRReviewAccepted, RequestType_PRReviewRejected:
req.Data = &PullRequestWebhookEvent{}
case RequestType_PRSync:
req.Data = &PullRequestWebhookEvent{}
case RequestType_Wiki:
req.Data = &WikiWebhookEvent{}
default:
return nil, fmt.Errorf("Unknown webhook request type: %s", reqType)
}
if err := json.Unmarshal(data, req.Data); err != nil {
return nil, err
}
return req, nil
}
type RequestHandler struct {
Error error
GitCommiter string
GitPath string
Branch []string
Branch string
PrjGit string
StdLogger, ErrLogger *log.Logger
Request Request
Request *Request
}
func (r *RequestHandler) WriteError() {

View File

@ -40,3 +40,12 @@ type IssueCommentWebhookEvent struct {
Repository *Repository
Sender *User
}
func (i *IssueWebhookEvent) GetAction() string {
return i.Action
}
func (i *IssueCommentWebhookEvent) GetAction() string {
return i.Action
}

View File

@ -46,6 +46,10 @@ type PullRequestWebhookEvent struct {
Sender User
}
func (p *PullRequestWebhookEvent) GetAction() string {
return p.Action
}
func (h *RequestHandler) parsePullRequest(data io.Reader) *PullRequestWebhookEvent {
if h.HasError() {
return nil
@ -65,8 +69,7 @@ func (h *RequestHandler) parsePullRequest(data io.Reader) *PullRequestWebhookEve
h.ErrLogger.Printf("%#v\n", action)
return nil
}
h.PrjGit = action.Repository.Ssh_Url[:repoIdx+1] + DefaultGitPrj + ".git"
h.Data = &action
h.Request.Data = &action
// sanity checks on request
return &action

View File

@ -11,7 +11,7 @@ type Commit struct {
Message string
}
type PushRequest struct {
type PushWebhookEvent struct {
Total_Commits int
Head_Commit Commit
Commits []Commit
@ -21,12 +21,16 @@ type PushRequest struct {
Repository *Repository
}
func (h *RequestHandler) parsePushRequest(data io.Reader) *PushRequest {
func (*PushWebhookEvent) GetAction() string {
return "push"
}
func (h *RequestHandler) parsePushRequest(data io.Reader) *PushWebhookEvent {
if h.HasError() {
return nil
}
var action PushRequest
var action PushWebhookEvent
h.Error = json.NewDecoder(data).Decode(&action)
if h.HasError() {
@ -39,10 +43,9 @@ func (h *RequestHandler) parsePushRequest(data io.Reader) *PushRequest {
h.ErrLogger.Printf("Unexpected URL for SSH repository: '%s'\n", action.Repository.Name)
return nil
}
h.PrjGit = action.Repository.Ssh_Url[:repoIdx+1] + DefaultGitPrj + ".git"
h.StdLogger.Printf("Request push for repo: %s\n", action.Repository.Full_Name)
h.Data = action
h.Request.Data = &action
if len(action.Commits) < 1 || len(action.Head_Commit.Id) != 64 {
h.ErrLogger.Println("Request has no action .... skipping")
return nil

View File

@ -17,3 +17,7 @@ type ReleaseWebhookEvent struct {
Repository *Repository
Sender *User
}
func (r *ReleaseWebhookEvent) GetAction() string {
return r.Action
}

View File

@ -31,7 +31,7 @@ type User struct {
Username string
}
type RepositoryAction struct {
type RepositoryWebhookEvent struct {
Action string
Sender *User
@ -41,13 +41,17 @@ type RepositoryAction struct {
PrjGit string
}
func (r *RepositoryWebhookEvent) GetAction() string {
return r.Action
}
// TODO: sanity check values!!!!
func (h *RequestHandler) parseRepositoryRequest(dataReader io.Reader) *RepositoryAction {
func (h *RequestHandler) parseRepositoryRequest(dataReader io.Reader) *RepositoryWebhookEvent {
if h.HasError() {
return nil
}
var data RepositoryAction
var data RepositoryWebhookEvent
h.Error = json.NewDecoder(dataReader).Decode(&data)
if h.HasError() {
@ -69,6 +73,6 @@ func (h *RequestHandler) parseRepositoryRequest(dataReader io.Reader) *Repositor
h.ErrLogger.Println("Request has no data.... skipping")
}
h.Data = data
h.Request.Data = &data
return &data
}

View File

@ -8,3 +8,8 @@ type WikiWebhookEvent struct {
Repository *Repository
Sender User
}
func (w *WikiWebhookEvent) GetAction() string {
return w.Action
}

View File

@ -6,8 +6,14 @@ import (
"os"
)
const (
RabbitUserEnv = "AMQP_USERNAME"
RabbitPasswordEnv = "AMQP_PASSWORD"
)
var giteaToken string
var obsUser, obsPassword string
var rabbitUser, rabbitPassword string
func RequireGiteaSecretToken() error {
giteaToken = os.Getenv(GiteaTokenEnv)
@ -48,3 +54,12 @@ func RequireObsSecretToken() error {
return nil
}
func RequireRabbitSecrets() error {
rabbitUser = os.Getenv(RabbitUserEnv)
rabbitPassword = os.Getenv(RabbitPasswordEnv)
if len(rabbitUser) == 0 || len(rabbitPassword) == 0 {
return fmt.Errorf("Missing RabbitMQ env variables: %s, %s", RabbitUserEnv, RabbitPasswordEnv)
}
return nil
}

View File

@ -61,14 +61,14 @@ func parseRequestJSONOrg(reqType string, data []byte) (org *common.Organization,
org = fork.Forkee.Owner
case common.RequestType_Push:
push := common.PushRequest{}
push := common.PushWebhookEvent{}
if err = json.Unmarshal(data, &push); err != nil {
return
}
org = push.Repository.Owner
case common.RequestType_Repository:
repoAction := common.RepositoryAction{}
repoAction := common.RepositoryWebhookEvent{}
if err = json.Unmarshal(data, &repoAction); err != nil {
return
}

View File

@ -6,20 +6,27 @@ import (
"log"
"os"
"path/filepath"
"slices"
"src.opensuse.org/autogits/common"
)
type ConfigRepos struct {
prjgit string
branch string // "" == default branch
}
var configuredRepos map[string]ConfigRepos
func processRepositoryAction(h *common.RequestHandler) error {
action := h.Request.Data.(common.RepositoryAction)
action := h.Request.Data.(*common.RepositoryWebhookEvent)
if action.Repository.Name == common.DefaultGitPrj {
h.Log("repository event %s for %s. Ignoring", common.DefaultGitPrj, action.Action)
h.StdLogger.Printf("repository event %s for %s. Ignoring\n", common.DefaultGitPrj, action.Action)
return nil
}
h.CreateRepositoryIfNotExist(action.Organization, common.DefaultGitPrj)
h.GitExec("", "clone", "--depth", "1", action.PrjGit, common.DefaultGitPrj)
h.CreateRepositoryIfNotExist(*action.Organization, h.PrjGit)
h.GitExec("", "clone", "--depth", "1", action.PrjGit, h.PrjGit)
switch action.Action {
case "created":
@ -28,7 +35,7 @@ func processRepositoryAction(h *common.RequestHandler) error {
h.GitExec(common.DefaultGitPrj, "push")
case "deleted":
if stat, err := os.Stat(filepath.Join(h.GitPath, common.DefaultGitPrj, action.Repository.Name)); err != nil || !stat.IsDir() {
h.Log("delete event for %s -- not in project. Ignoring", action.Repository.Name)
h.StdLogger.Printf("delete event for %s -- not in project. Ignoring\n", action.Repository.Name)
return nil
}
h.GitExec(common.DefaultGitPrj, "rm", action.Repository.Name)
@ -42,7 +49,7 @@ func processRepositoryAction(h *common.RequestHandler) error {
}
func processPushAction(h *common.RequestHandler) error {
action := h.Request.Data.(common.PushRequest)
action := h.Request.Data.(*common.PushWebhookEvent)
if action.Repository.Name == common.DefaultGitPrj {
h.StdLogger.Printf("push to %s -- ignoringi\n", common.DefaultGitPrj)
@ -51,7 +58,8 @@ func processPushAction(h *common.RequestHandler) error {
h.GitExec("", "clone", "--depth", "1", h.PrjGit, common.DefaultGitPrj)
if stat, err := os.Stat(filepath.Join(h.GitPath, common.DefaultGitPrj, action.Repository.Name)); err != nil || !stat.IsDir() {
return fmt.Errorf("Pushed to package that is not part of the project. Ignoring. : %v", err)
h.StdLogger.Printf("Pushed to package that is not part of the project. Ignoring. : %v\n", err)
return nil
}
h.GitExec(common.DefaultGitPrj, "submodule", "update", "--init", "--depth", "1", "--checkout", action.Repository.Name)
id, _ := h.GitBranchHead(filepath.Join(common.DefaultGitPrj, action.Repository.Name), action.Repository.Default_Branch)
@ -69,24 +77,52 @@ func processPushAction(h *common.RequestHandler) error {
return nil
}
var debugMode bool
func main() {
workflowConfig := flag.String("config", "", "Repository and workflow definition file")
rabbitUrl := flag.String("url", "amqps://rabbit.opensuse.org", "URL for RabbitMQ instance")
flag.BoolVar(&debugMode, "debug", false, "Extra debugging information")
flag.Parse()
if len(*workflowConfig) == 0 {
log.Fatalln("No configuratio file specified. Aborting")
}
configs, err := common.ReadWorkflowConfigsFile(*workflowConfig)
if err != nil {
log.Fatalf("Error reading config file. err: %v", err)
}
configuredRepos = make(map[string]ConfigRepos)
orgs := make([]string, 0, 10)
for _, c := range configs {
if slices.Contains(c.Workflows, "push") {
if debugMode {
log.Printf(" + adding org: '%s', branch: '%s', prjgit: '%s'\n", c.Organization, c.Branch, c.GitProjectName)
}
configuredRepos[c.Organization] = ConfigRepos{
prjgit: c.GitProjectName,
branch: c.Branch,
}
orgs = append(orgs, c.Organization)
}
}
var defs common.ListenDefinitions
defs.Url = "prjgit-updater"
defs.GitAuthor = "GiteaBot - AutoDevel"
defs.RabbitURL = *rabbitUrl
defs.Handlers=make(map[string]common.RequestProcessor)
defs.Handlers = make(map[string]common.RequestProcessor)
defs.Handlers[common.RequestType_Push] = processPushAction
defs.Handlers[common.RequestType_Repository] = processRepositoryAction
common.RequireGiteaSecretToken()
common.StartServer(defs, configs)
if err := common.RequireGiteaSecretToken(); err != nil {
log.Fatal(err)
}
if err := common.RequireRabbitSecrets(); err != nil {
log.Fatal(err)
}
log.Fatal(common.ProcessRabbitMQEvents(defs, orgs))
}