This commit is contained in:
Adam Majer 2024-08-19 12:05:43 +02:00
parent bc0950bd00
commit a4c7e18230
2 changed files with 16 additions and 7 deletions

View File

@ -11,8 +11,8 @@ import (
) )
const ( const (
ListenAddr = "[::1]:8002" ListenAddrDef = "[::1]:8002"
AppName = "rabbitmq-forwarder" AppName = "rabbitmq-forwarder"
) )
var DebugMode bool var DebugMode bool
@ -31,7 +31,10 @@ func connectToRabbitMQ() {
} }
func main() { func main() {
var listenAddr string
flag.BoolVar(&DebugMode, "debug", false, "enables debugging messages") flag.BoolVar(&DebugMode, "debug", false, "enables debugging messages")
flag.StringVar(&listenAddr, "listen", ListenAddrDef, "HTTP listen socket address for webhook events")
flag.Parse() flag.Parse()
connectToRabbitMQ() connectToRabbitMQ()
@ -55,14 +58,20 @@ func main() {
return return
} }
reqType := hdr[0] reqType := hdr[0]
err := PublicMessage(reqType, req.PathValue("Org"), req.Body) err := PublishMessage(reqType, req.PathValue("Org"), req.Body)
if err != nil { if err != nil {
log.Printf("hook (%s) processing error: %v\n", reqType, err) errorStr := fmt.Sprintf("hook (%s) processing error: %v\n", reqType, err)
res.Header().Add("Content-Type", "plain/text")
res.Write([]byte(errorStr))
res.WriteHeader(http.StatusBadRequest) res.WriteHeader(http.StatusBadRequest)
if DebugMode {
log.Println(errorStr)
}
} else { } else {
res.WriteHeader(http.StatusOK) res.WriteHeader(http.StatusOK)
} }
}) })
log.Fatal(http.ListenAndServe(ListenAddr, nil)) log.Fatal(http.ListenAndServe(listenAddr, nil))
} }

View File

@ -26,7 +26,7 @@ type Message struct {
var messageQueue chan Message var messageQueue chan Message
func PublicMessage(giteaWebhookType, giteaOrg string, msgBody io.Reader) error { func PublishMessage(giteaWebhookType, giteaOrg string, msgBody io.Reader) error {
if messageQueue == nil { if messageQueue == nil {
return fmt.Errorf("Queue not initialized") return fmt.Errorf("Queue not initialized")
} }
@ -110,7 +110,7 @@ func ConnectToExchangeForPublish(host, username, password string) {
if ch.IsClosed() || connection.IsClosed() { if ch.IsClosed() || connection.IsClosed() {
select { select {
case messageQueue <- msg: case messageQueue <- msg:
log.Println("requed ...") log.Println("requeued ...")
default: default:
log.Println("queue full... message lost") log.Println("queue full... message lost")
} }