185 lines
4.6 KiB
Go
185 lines
4.6 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"crypto/tls"
|
||
|
"encoding/json"
|
||
|
"log"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
_ "github.com/mattn/go-sqlite3"
|
||
|
rabbitmq "github.com/rabbitmq/amqp091-go"
|
||
|
"src.opensuse.org/autogits/common"
|
||
|
"src.opensuse.org/autogits/common/gitea-generated/models"
|
||
|
)
|
||
|
|
||
|
type BuildNotification struct {
|
||
|
BuildSuccess bool
|
||
|
Project, Package, Repository, Arch, Release, Rev, Buildtype, Workerid string
|
||
|
Starttime, Endtime, Readytime string
|
||
|
}
|
||
|
|
||
|
var obsNotifications map[string]*BuildNotification // Project/Package/Repositry/Arch as key
|
||
|
var notificationMutex sync.RWMutex
|
||
|
var notificationChannels map[string][]chan *BuildNotification
|
||
|
|
||
|
func getProjectBuildStatus(project string) []*BuildNotification {
|
||
|
notificationMutex.RLock()
|
||
|
defer notificationMutex.RUnlock()
|
||
|
|
||
|
data := make([]*BuildNotification, 0, 4)
|
||
|
for _, val := range obsNotifications {
|
||
|
if val.Package == project {
|
||
|
data = append(data, val)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return data
|
||
|
}
|
||
|
|
||
|
func addProjectWatcher(meta *common.ProjectMeta, pr *models.PullReview) {
|
||
|
}
|
||
|
|
||
|
func addObsNotificationToCache(notification *BuildNotification) {
|
||
|
key := strings.Join(
|
||
|
[]string{
|
||
|
notification.Project,
|
||
|
notification.Package,
|
||
|
notification.Repository,
|
||
|
notification.Arch,
|
||
|
},
|
||
|
"/",
|
||
|
)
|
||
|
|
||
|
notificationMutex.Lock()
|
||
|
obsNotifications[key] = notification
|
||
|
|
||
|
chns, ok := notificationChannels[notification.Project]
|
||
|
notificationMutex.Unlock()
|
||
|
if ok {
|
||
|
for _, ch := range chns {
|
||
|
ch <- notification
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func processObsMessage(msg *rabbitmq.Delivery) {
|
||
|
key := strings.SplitN(msg.RoutingKey, ".", 4)
|
||
|
if len(key) != 4 || len(key[3]) < 7 || key[3][:6] != "build_" {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
buildSuccess := false
|
||
|
switch key[3][6:] {
|
||
|
case "success", "unchanged":
|
||
|
buildSuccess = true
|
||
|
case "fail":
|
||
|
buildSuccess = false
|
||
|
default:
|
||
|
log.Printf("unknown build_ logging message: %s\n", msg.RoutingKey)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
notification := &BuildNotification{
|
||
|
BuildSuccess: buildSuccess,
|
||
|
}
|
||
|
err := json.Unmarshal(msg.Body, notification)
|
||
|
if err != nil {
|
||
|
log.Printf("Cannot unmarshall json object: %s\n", msg.Body)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
log.Printf("%v\n", notification)
|
||
|
addObsNotificationToCache(notification)
|
||
|
}
|
||
|
|
||
|
func failOnError(err error, msg string) {
|
||
|
if err != nil {
|
||
|
log.Panicf("%s: %s", err, msg)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func ProcessingObsMessages(host, username, password, queueName string) {
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
log.Print("recovering... reconnecting...\n")
|
||
|
time.Sleep(5 * time.Second)
|
||
|
go ProcessingObsMessages(host, username, password, queueName)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
if obsNotifications == nil {
|
||
|
obsNotifications = make(map[string]*BuildNotification)
|
||
|
// notificationChannels = make(map[string]chan *BuildNotification)
|
||
|
}
|
||
|
|
||
|
auth := ""
|
||
|
if len(username) > 0 && len(password) > 0 {
|
||
|
auth = username + ":" + password + "@"
|
||
|
}
|
||
|
|
||
|
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
|
||
|
ServerName: host,
|
||
|
})
|
||
|
failOnError(err, "Cannot connect to rabbit.opensuse.org")
|
||
|
defer connection.Close()
|
||
|
|
||
|
ch, err := connection.Channel()
|
||
|
failOnError(err, "Cannot create a channel")
|
||
|
defer ch.Close()
|
||
|
|
||
|
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
|
||
|
failOnError(err, "Cannot find pubsub exchange")
|
||
|
|
||
|
var q rabbitmq.Queue
|
||
|
if len(queueName) == 0 {
|
||
|
q, err = ch.QueueDeclare("", false, true, true, false, nil)
|
||
|
} else {
|
||
|
q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil)
|
||
|
if err != nil {
|
||
|
log.Printf("queue not found .. trying to create it: %v\n", err)
|
||
|
if ch.IsClosed() {
|
||
|
ch, err = connection.Channel()
|
||
|
failOnError(err, "Channel cannot be re-opened")
|
||
|
}
|
||
|
q, err = ch.QueueDeclare(queueName, true, false, true, false, nil)
|
||
|
|
||
|
if err != nil {
|
||
|
log.Printf("can't create persistent queue ... falling back to temporaty queue: %v\n", err)
|
||
|
if ch.IsClosed() {
|
||
|
ch, err = connection.Channel()
|
||
|
failOnError(err, "Channel cannot be re-opened")
|
||
|
}
|
||
|
q, err = ch.QueueDeclare("", false, true, true, false, nil)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
failOnError(err, "Cannot declare queue")
|
||
|
log.Printf("queue: %s:%d", q.Name, q.Consumers)
|
||
|
|
||
|
err = ch.QueueBind(q.Name, "*.obs.package.*", "pubsub", false, nil)
|
||
|
failOnError(err, "Cannot bind queue to exchange")
|
||
|
|
||
|
msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
|
||
|
failOnError(err, "Cannot start consumer")
|
||
|
log.Printf("queue: %s:%d", q.Name, q.Consumers)
|
||
|
|
||
|
for {
|
||
|
msg, ok := <-msgs
|
||
|
if !ok {
|
||
|
log.Printf("channel/connection closed?\n")
|
||
|
|
||
|
if connection.IsClosed() {
|
||
|
// reconnect
|
||
|
log.Printf("reconnecting...")
|
||
|
time.Sleep(5 * time.Second)
|
||
|
go ProcessingObsMessages(host, username, password, queueName)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
processObsMessage(&msg)
|
||
|
}
|
||
|
}
|