autogits/gitea-events-rabbitmq-publisher/main.go
2024-08-26 14:09:23 +02:00

429 lines
10 KiB
Go

package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"src.opensuse.org/autogits/common"
)
const (
ListenAddrDef = "[::1]:8002"
AppName = "rabbitmq-forwarder"
)
var DebugMode bool
func connectToRabbitMQ() {
host := os.Getenv("RABBITMQ_HOST")
username := os.Getenv("RABBITMQ_USERNAME")
password := os.Getenv("RABBITMQ_PASSWORD")
if len(host) == 0 || len(username) == 0 || len(password) == 0 {
fmt.Println("Missing RABBITMQ_HOST, RABBITMQ_USERNAME, RABBITMQ_PASSWORD")
os.Exit(1)
}
go ConnectToExchangeForPublish(host, username, password)
}
var id int
func dumpUnhandledData(reqType string, data []byte) {
id++
os.WriteFile("/tmp/unhandled-json-"+reqType+"-"+fmt.Sprint(id)+".json", data, 0600)
}
func parseRequestJSON(reqType string, data []byte) (org *common.Organization, extraAction string, err error) {
extraAction = ""
switch reqType {
case "create", "delete":
create := common.CreateWebhookEvent{}
if err = json.Unmarshal(data, &create); err != nil {
return
}
org = create.Repository.Owner
case "fork":
fork := common.ForkWebhookEvent{}
if err = json.Unmarshal(data, &fork); err != nil {
return
}
org = fork.Forkee.Owner
case "push":
push := common.PushRequest{}
if err = json.Unmarshal(data, &push); err != nil {
return
}
org = push.Repository.Owner
case "repository":
repoAction := common.RepositoryAction{}
if err = json.Unmarshal(data, &repoAction); err != nil {
return
}
switch repoAction.Action {
case "created", "deleted":
break
default:
err = fmt.Errorf("Unknown repository webhook action type: %s", repoAction.Action)
return
}
org = repoAction.Organization
extraAction = repoAction.Action
case "release":
release := common.ReleaseWebhookEvent{}
if err = json.Unmarshal(data, &release); err != nil {
return
}
switch release.Action {
case "published", "updated", "deleted":
break
default:
err = fmt.Errorf("Unknwon Release webhook action type: %s", release.Action)
return
}
org = release.Repository.Owner
extraAction = release.Action
case "issues":
issue := common.IssueWebhookEvent{}
if err = json.Unmarshal(data, &issue); err != nil {
return
}
switch issue.Action {
case "opened", "closed", "reopened", "edited":
break
default:
err = fmt.Errorf("Unknown Issue webhook action type: %s", issue.Action)
return
}
org = issue.Repository.Owner
extraAction = issue.Action
case "issue_assign":
issue := common.IssueWebhookEvent{}
if err = json.Unmarshal(data, &issue); err != nil {
return
}
switch issue.Action {
case "assigned", "unassigned":
break
default:
err = fmt.Errorf("Unknown Issue Assign webhook action type: %s", issue.Action)
return
}
org = issue.Repository.Owner
extraAction = issue.Action
case "issue_comment", "pull_request_comment":
issue := common.IssueCommentWebhookEvent{}
if err = json.Unmarshal(data, &issue); err != nil {
return
}
switch issue.Action {
case "edited", "created", "deleted":
break
default:
err = fmt.Errorf("Unknown Issue/PR Comment webhook action type: %s", issue.Action)
return
}
org = issue.Repository.Owner
extraAction = issue.Action
case "issue_label":
issue := common.IssueWebhookEvent{}
if err = json.Unmarshal(data, &issue); err != nil {
return
}
switch issue.Action {
case "label_updated", "label_cleared":
break
default:
err = fmt.Errorf("Unknown Issue Assign webhook action type: %s", issue.Action)
return
}
org = issue.Repository.Owner
extraAction = issue.Action
case "issue_milestone":
issue := common.IssueWebhookEvent{}
if err = json.Unmarshal(data, &issue); err != nil {
return
}
switch issue.Action {
case "milestoned", "demilestoned":
break
default:
err = fmt.Errorf("Unknown Issue Assign webhook action type: %s", issue.Action)
return
}
org = issue.Repository.Owner
extraAction = issue.Action
case "pull_request":
pr := common.PullRequestWebhookEvent{}
if err = json.Unmarshal(data, &pr); err != nil {
return
}
switch pr.Action {
case "opened", "closed", "reopened", "edited":
break
default:
err = fmt.Errorf("Unknown PR webhook action type: %s", pr.Action)
return
}
org = pr.Repository.Owner
extraAction = pr.Action
case "pull_request_label":
pr := common.PullRequestWebhookEvent{}
if err = json.Unmarshal(data, &pr); err != nil {
return
}
switch pr.Action {
case "label_updated", "label_cleared":
break
default:
err = fmt.Errorf("Unknown PR Label webhook action type: %s", pr.Action)
return
}
org = pr.Repository.Owner
extraAction = pr.Action
case "pull_request_milestone":
pr := common.PullRequestWebhookEvent{}
if err = json.Unmarshal(data, &pr); err != nil {
return
}
switch pr.Action {
case "milestoned", "demilestoned":
break
default:
err = fmt.Errorf("Unknown PR Milestone webhook action type: %s", pr.Action)
return
}
org = pr.Repository.Owner
extraAction = pr.Action
case "pull_request_assign":
issue := common.PullRequestWebhookEvent{}
if err = json.Unmarshal(data, &issue); err != nil {
return
}
switch issue.Action {
case "assigned", "unassigned":
break
default:
err = fmt.Errorf("Unknown PR Assign webhook action type: %s", issue.Action)
return
}
org = issue.Repository.Owner
extraAction = issue.Action
case "pull_request_review_request":
issue := common.PullRequestWebhookEvent{}
if err = json.Unmarshal(data, &issue); err != nil {
return
}
switch issue.Action {
case "review_requested", "review_request_removed":
break
default:
err = fmt.Errorf("Unknown PR Review Request webhook action type: %s", issue.Action)
return
}
org = issue.Repository.Owner
extraAction = issue.Action
case "pull_request_review_rejected", "pull_request_review_approved":
pr := common.PullRequestWebhookEvent{}
if err = json.Unmarshal(data, &pr); err != nil {
return
}
switch pr.Action {
case "reviewed":
break
default:
err = fmt.Errorf("Unknown PR Review webhook action type: %s", pr.Action)
return
}
org = pr.Repository.Owner
extraAction = ""
case "pull_request_sync":
pr := common.PullRequestWebhookEvent{}
if err = json.Unmarshal(data, &pr); err != nil {
return
}
switch pr.Action {
case "synchronized":
break
default:
err = fmt.Errorf("Unknown PR Sync webhook action type: %s", pr.Action)
return
}
org = pr.Repository.Owner
extraAction = ""
case "wiki":
wiki := common.WikiWebhookEvent{}
if err = json.Unmarshal(data, &wiki); err != nil {
return
}
switch wiki.Action {
case "created", "edited", "renamed", "deleted":
break
default:
err = fmt.Errorf("Unknown Wiki webhook action type: %s", wiki.Action)
return
}
org = wiki.Repository.Owner
extraAction = wiki.Action
default:
// TODO: package webhook
err = fmt.Errorf("Unknown webhook request type: %s", reqType)
}
return
}
func main() {
var listenAddr string
flag.BoolVar(&DebugMode, "debug", false, "enables debugging messages")
flag.StringVar(&listenAddr, "listen", ListenAddrDef, "HTTP listen socket address for webhook events")
flag.Parse()
connectToRabbitMQ()
http.HandleFunc("POST /rabbitmq-forwarder", func(res http.ResponseWriter, req *http.Request) {
if len(req.Header.Get("Content-Type")) == 0 ||
req.Header["Content-Type"][0] != "application/json" ||
req.Method != "POST" {
res.WriteHeader(http.StatusInternalServerError)
return
}
hdr := req.Header[common.GiteaRequestHeader]
if len(hdr) != 1 {
res.WriteHeader(http.StatusInternalServerError)
log.Printf("Multiple Gitea headers received. %#v\n", hdr)
if DebugMode {
log.Println(req.Header)
}
return
}
reqType := hdr[0]
data, err := io.ReadAll(req.Body)
if err != nil {
errorStr := fmt.Sprintf("error reading hook info: %v", err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Printf(errorStr)
}
}
if !json.Valid(data) {
if DebugMode {
log.Println("send invalid json request")
}
res.WriteHeader(http.StatusBadRequest)
}
org, extraAction, err := parseRequestJSON(reqType, data)
if err != nil {
res.WriteHeader(http.StatusBadRequest)
log.Printf("error parsing webhook %s JSON. err: %v", reqType, err)
if DebugMode {
dumpUnhandledData(reqType, data)
}
return
}
if org == nil {
res.WriteHeader(http.StatusBadRequest)
log.Printf("no `org` for message... type: %s", reqType)
if DebugMode {
dumpUnhandledData(reqType, data)
}
}
err = PublishActionMessage(org.Username, reqType, extraAction, data)
if err != nil {
errorStr := fmt.Sprintf("hook (%s) processing error: %v\n", reqType, err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Println(errorStr)
}
}
res.WriteHeader(http.StatusOK)
})
http.HandleFunc("POST /rabbitmq-forwarder/{Org}", func(res http.ResponseWriter, req *http.Request) {
if len(req.Header.Get("Content-Type")) == 0 ||
req.Header["Content-Type"][0] != "application/json" ||
req.Method != "POST" {
res.WriteHeader(http.StatusInternalServerError)
return
}
hdr := req.Header[common.GiteaRequestHeader]
if len(hdr) != 1 {
res.WriteHeader(http.StatusInternalServerError)
log.Printf("Multiple Gitea headers received. %#v\n", hdr)
if DebugMode {
log.Println(req.Header)
}
return
}
reqType := hdr[0]
data, err := io.ReadAll(req.Body)
if err != nil {
errorStr := fmt.Sprintf("error reading hook info: %v", err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Printf(errorStr)
}
}
err = PublishMessage(req.PathValue("Org"), reqType, data)
if err != nil {
errorStr := fmt.Sprintf("hook (%s) processing error: %v\n", reqType, err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Println(errorStr)
}
} else {
res.WriteHeader(http.StatusOK)
}
})
log.Fatal(http.ListenAndServe(listenAddr, nil))
}