autogits/gitea-events-rabbitmq-publisher/main.go

223 lines
5.3 KiB
Go
Raw Normal View History

2024-08-13 16:42:20 +02:00
package main
import (
2024-08-24 13:32:39 +02:00
"encoding/json"
2024-08-15 10:47:59 +02:00
"flag"
2024-08-14 17:43:56 +02:00
"fmt"
2024-08-24 13:32:39 +02:00
"io"
2024-08-13 16:42:20 +02:00
"log"
"net/http"
2024-08-14 17:43:56 +02:00
"os"
2024-08-13 16:42:20 +02:00
"src.opensuse.org/autogits/common"
)
const (
2024-08-19 12:05:43 +02:00
ListenAddrDef = "[::1]:8002"
AppName = "rabbitmq-forwarder"
2024-08-13 16:42:20 +02:00
)
2024-08-15 10:47:59 +02:00
var DebugMode bool
2024-08-14 17:43:56 +02:00
func connectToRabbitMQ() {
host := os.Getenv("RABBITMQ_HOST")
username := os.Getenv("RABBITMQ_USERNAME")
password := os.Getenv("RABBITMQ_PASSWORD")
if len(host) == 0 || len(username) == 0 || len(password) == 0 {
2024-08-15 10:47:59 +02:00
fmt.Println("Missing RABBITMQ_HOST, RABBITMQ_USERNAME, RABBITMQ_PASSWORD")
2024-08-14 17:43:56 +02:00
os.Exit(1)
}
2024-08-15 10:47:59 +02:00
2024-08-14 17:58:22 +02:00
go ConnectToExchangeForPublish(host, username, password)
2024-08-14 17:43:56 +02:00
}
2024-08-24 16:46:57 +02:00
var id int
2024-08-24 18:37:08 +02:00
2024-08-24 16:46:57 +02:00
func dumpUnhandledData(reqType string, data []byte) {
id++
2024-08-24 18:37:08 +02:00
os.WriteFile("/tmp/unhandled-json-"+reqType+"-"+fmt.Sprint(id)+".json", data, 0600)
2024-08-24 16:46:57 +02:00
}
2024-08-13 16:42:20 +02:00
func main() {
2024-08-19 12:05:43 +02:00
var listenAddr string
2024-08-15 10:47:59 +02:00
flag.BoolVar(&DebugMode, "debug", false, "enables debugging messages")
2024-08-19 12:05:43 +02:00
flag.StringVar(&listenAddr, "listen", ListenAddrDef, "HTTP listen socket address for webhook events")
2024-08-15 10:47:59 +02:00
flag.Parse()
2024-08-14 17:43:56 +02:00
connectToRabbitMQ()
2024-08-13 16:42:20 +02:00
2024-08-24 13:32:39 +02:00
connectionId := 1
http.HandleFunc("POST /test", func(res http.ResponseWriter, req *http.Request) {
if len(req.Header.Get("Content-Type")) == 0 ||
req.Header["Content-Type"][0] != "application/json" ||
req.Method != "POST" {
res.WriteHeader(http.StatusInternalServerError)
return
}
hdr := req.Header[common.GiteaRequestHeader]
if len(hdr) != 1 {
res.WriteHeader(http.StatusInternalServerError)
log.Printf("Multiple Gitea headers received. %#v\n", hdr)
if DebugMode {
log.Println(req.Header)
}
return
}
2024-08-24 16:46:57 +02:00
reqType := hdr[0]
2024-08-24 13:32:39 +02:00
data, err := io.ReadAll(req.Body)
if err != nil {
errorStr := fmt.Sprintf("error reading hook info: %v", err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Printf(errorStr)
}
}
if !json.Valid(data) {
if DebugMode {
log.Println("send invalid json request")
}
res.WriteHeader(http.StatusBadRequest)
}
2024-08-24 18:15:53 +02:00
org := common.Organization{}
extraAction := ""
2024-08-24 16:46:57 +02:00
switch reqType {
case "create", "delete":
2024-08-24 17:20:46 +02:00
create := common.CreateWebhookEvent{}
2024-08-24 16:46:57 +02:00
if err = json.Unmarshal(data, &create); err != nil {
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
dumpUnhandledData(reqType, data)
}
2024-08-24 17:20:46 +02:00
return
2024-08-24 16:46:57 +02:00
}
2024-08-24 18:15:53 +02:00
org = *create.Repository.Owner
2024-08-24 17:20:46 +02:00
case "fork":
fork := common.ForkWebhookEvent{}
if err = json.Unmarshal(data, &fork); err != nil {
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
dumpUnhandledData(reqType, data)
}
return
}
2024-08-24 18:15:53 +02:00
org = *fork.Forkee.Owner
2024-08-24 17:20:46 +02:00
case "push":
push := common.PushRequest{}
if err = json.Unmarshal(data, &push); err != nil {
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
dumpUnhandledData(reqType, data)
}
return
}
2024-08-24 18:15:53 +02:00
org = *push.Repository.Owner
case "repository":
repoAction := common.RepositoryAction{}
if err = json.Unmarshal(data, &repoAction); err != nil {
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
dumpUnhandledData(reqType, data)
}
return
}
2024-08-24 18:37:08 +02:00
switch repoAction.Action {
case "created", "deleted":
break
default:
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
dumpUnhandledData(reqType, data)
}
}
2024-08-24 18:15:53 +02:00
org = repoAction.Organization
extraAction = repoAction.Action
2024-08-24 18:37:08 +02:00
default:
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
dumpUnhandledData(reqType, data)
}
2024-08-24 16:46:57 +02:00
}
2024-08-24 18:37:08 +02:00
// err = PublishMessage(repo.Owner.Username, reqType, data)
2024-08-24 13:32:39 +02:00
// write to file for review
os.WriteFile("test_data."+fmt.Sprint(connectionId), data, 0644)
2024-08-24 16:46:57 +02:00
connectionId++
2024-08-24 18:15:53 +02:00
log.Println(id, org.Username, extraAction)
2024-08-24 13:32:39 +02:00
/*
2024-08-24 18:37:08 +02:00
err = PublishMessage(org.Name, reqType, extraAction, data)
2024-08-24 13:32:39 +02:00
if err != nil {
errorStr := fmt.Sprintf("hook (%s) processing error: %v\n", reqType, err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Println(errorStr)
}
} else {*/
res.WriteHeader(http.StatusOK)
// }
})
2024-08-14 17:43:56 +02:00
http.HandleFunc("POST /rabbitmq-forwarder/{Org}", func(res http.ResponseWriter, req *http.Request) {
2024-08-13 16:42:20 +02:00
if len(req.Header.Get("Content-Type")) == 0 ||
req.Header["Content-Type"][0] != "application/json" ||
req.Method != "POST" {
res.WriteHeader(http.StatusInternalServerError)
return
}
hdr := req.Header[common.GiteaRequestHeader]
if len(hdr) != 1 {
res.WriteHeader(http.StatusInternalServerError)
2024-08-15 10:47:59 +02:00
log.Printf("Multiple Gitea headers received. %#v\n", hdr)
if DebugMode {
log.Println(req.Header)
}
2024-08-13 16:42:20 +02:00
return
}
reqType := hdr[0]
2024-08-24 13:32:39 +02:00
data, err := io.ReadAll(req.Body)
if err != nil {
errorStr := fmt.Sprintf("error reading hook info: %v", err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Printf(errorStr)
}
}
err = PublishMessage(req.PathValue("Org"), reqType, data)
2024-08-13 16:42:20 +02:00
if err != nil {
2024-08-19 12:05:43 +02:00
errorStr := fmt.Sprintf("hook (%s) processing error: %v\n", reqType, err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
2024-08-13 16:42:20 +02:00
res.WriteHeader(http.StatusBadRequest)
2024-08-19 12:05:43 +02:00
if DebugMode {
log.Println(errorStr)
}
2024-08-13 16:42:20 +02:00
} else {
res.WriteHeader(http.StatusOK)
}
})
2024-08-19 12:05:43 +02:00
log.Fatal(http.ListenAndServe(listenAddr, nil))
2024-08-13 16:42:20 +02:00
}