This commit is contained in:
Adam Majer 2024-08-15 10:47:59 +02:00
parent abe7fe9c40
commit 645f934db1
2 changed files with 33 additions and 16 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"flag"
"fmt"
"log"
"net/http"
@ -10,17 +11,19 @@ import (
)
const (
ListenAddr = "[::1]:8002"
RabbitForwarderPath = "rabbitmq-forwarder"
ListenAddr = "[::1]:8002"
AppName = "rabbitmq-forwarder"
)
var DebugMode bool
func connectToRabbitMQ() {
host := os.Getenv("RABBITMQ_HOST")
username := os.Getenv("RABBITMQ_USERNAME")
password := os.Getenv("RABBITMQ_PASSWORD")
if len(host) == 0 || len(username) == 0 || len(password) == 0 {
fmt.Printf("Missing RABBITMQ_HOST, RABBITMQ_USERNAME, RABBITMQ_PASSWORD")
fmt.Println("Missing RABBITMQ_HOST, RABBITMQ_USERNAME, RABBITMQ_PASSWORD")
os.Exit(1)
}
@ -28,7 +31,9 @@ func connectToRabbitMQ() {
}
func main() {
common.RequireGiteaSecretToken()
flag.BoolVar(&DebugMode, "debug", false, "enables debugging messages")
flag.Parse()
connectToRabbitMQ()
http.HandleFunc("POST /rabbitmq-forwarder/{Org}", func(res http.ResponseWriter, req *http.Request) {
@ -42,9 +47,11 @@ func main() {
hdr := req.Header[common.GiteaRequestHeader]
if len(hdr) != 1 {
// h.LogError("Unsupported number of %s headers: %d: %#v\n", GiteaRequestHeader, len(hdr), hdr)
// h.WriteError()
res.WriteHeader(http.StatusInternalServerError)
log.Printf("Multiple Gitea headers received. %#v\n", hdr)
if DebugMode {
log.Println(req.Header)
}
return
}
reqType := hdr[0]

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@ -15,7 +16,7 @@ import (
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", err, msg)
log.Panicf("%s: %s\n", err, msg)
}
}
@ -37,12 +38,17 @@ func PublicMessage(giteaWebhookType, giteaOrg string, msgBody io.Reader) error {
}
if !json.Valid(data) {
return fmt.Errorf("Invalid JSON in request")
return errors.New("Invalid JSON in request")
}
messageQueue <- Message{
select {
case messageQueue <- Message{
Topic: fmt.Sprintf("opensuse.gitea.%s.%s", giteaOrg, giteaWebhookType),
Body: data,
}:
default:
return errors.New("Enable to queue message. Possibly queue full.")
}
return nil
}
@ -81,16 +87,20 @@ func ConnectToExchangeForPublish(host, username, password string) {
for {
msg, ok := <-messageQueue
if !ok {
log.Printf("Shutdown ... \n")
log.Println("Shutdown ...")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if DebugMode {
log.Println(msg)
}
err = ch.PublishWithContext(ctx, "pubsub", msg.Topic, false, false, rabbitmq.Publishing{
ContentType: "application/json",
AppId: RabbitForwarderPath,
AppId: AppName,
Body: []byte(msg.Body),
Timestamp: time.Now(),
DeliveryMode: 2,
@ -101,17 +111,17 @@ func ConnectToExchangeForPublish(host, username, password string) {
if ch.IsClosed() || connection.IsClosed() {
select {
case messageQueue <- msg:
log.Printf("requed ...")
log.Println("requed ...")
default:
log.Printf("queue full... message lost")
log.Println("queue full... message lost")
}
// reconnect
log.Printf("reconnecting...")
log.Println("reconnecting...")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
return
} else {
log.Printf("Error sending request. %v", err)
log.Printf("Error sending request. %v\n", err)
}
}
}