package main import ( "crypto/tls" "encoding/json" "log" "os" "strings" "sync" "time" _ "github.com/mattn/go-sqlite3" rabbitmq "github.com/rabbitmq/amqp091-go" "src.opensuse.org/autogits/common" "src.opensuse.org/autogits/common/gitea-generated/models" ) type BuildNotification struct { BuildSuccess bool Project, Package, Repository, Arch, Release, Rev, Buildtype, Workerid string Starttime, Endtime, Readytime string } var out *os.File var obsNotifications map[string]*BuildNotification // Project/Package/Repositry/Arch as key var notificationMutex sync.RWMutex var notificationChannels map[string][]chan *BuildNotification func getProjectBuildStatus(project string) []*BuildNotification { notificationMutex.RLock() defer notificationMutex.RUnlock() data := make([]*BuildNotification, 0, 4) for _, val := range obsNotifications { if val.Package == project { data = append(data, val) } } return data } func addProjectWatcher(meta *common.ProjectMeta, pr *models.PullReview) { } func addObsNotificationToCache(notification *BuildNotification) { key := strings.Join( []string{ notification.Project, notification.Package, notification.Repository, notification.Arch, }, "/", ) notificationMutex.Lock() obsNotifications[key] = notification chns, ok := notificationChannels[notification.Project] notificationMutex.Unlock() if ok { for _, ch := range chns { ch <- notification } } } func processObsMessage(msg *rabbitmq.Delivery) { out.Write([]byte(msg.Timestamp.String())) out.Write([]byte("\n")) out.Write([]byte(msg.RoutingKey)) out.Write([]byte("\n")) out.Write(msg.Body) out.Write([]byte("\n--------------------------\n")) return key := strings.SplitN(msg.RoutingKey, ".", 4) if len(key) != 4 || len(key[3]) < 7 || key[3][:6] != "build_" { return } buildSuccess := false switch key[3][6:] { case "success", "unchanged": buildSuccess = true case "fail": buildSuccess = false default: log.Printf("unknown build_ logging message: %s\n", msg.RoutingKey) return } notification := &BuildNotification{ BuildSuccess: buildSuccess, } err := json.Unmarshal(msg.Body, notification) if err != nil { log.Printf("Cannot unmarshall json object: %s\n", msg.Body) return } log.Printf("%v\n", notification) addObsNotificationToCache(notification) } func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", err, msg) } } func ProcessingObsMessages(host, username, password, queueName string) { defer func() { if r := recover(); r != nil { log.Print("recovering... reconnecting...\n") time.Sleep(5 * time.Second) go ProcessingObsMessages(host, username, password, queueName) } }() var err error out, err = os.OpenFile("messages.txt", os.O_WRONLY | os.O_APPEND, 0644) if err != nil { log.Printf("Cannot open message.txt: %v", err) return } if obsNotifications == nil { obsNotifications = make(map[string]*BuildNotification) // notificationChannels = make(map[string]chan *BuildNotification) } auth := "" if len(username) > 0 && len(password) > 0 { auth = username + ":" + password + "@" } connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{ ServerName: host, }) failOnError(err, "Cannot connect to rabbit.opensuse.org") defer connection.Close() ch, err := connection.Channel() failOnError(err, "Cannot create a channel") defer ch.Close() err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil) failOnError(err, "Cannot find pubsub exchange") var q rabbitmq.Queue if len(queueName) == 0 { q, err = ch.QueueDeclare("", false, true, true, false, nil) } else { q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil) if err != nil { log.Printf("queue not found .. trying to create it: %v\n", err) if ch.IsClosed() { ch, err = connection.Channel() failOnError(err, "Channel cannot be re-opened") } q, err = ch.QueueDeclare(queueName, true, false, true, false, nil) if err != nil { log.Printf("can't create persistent queue ... falling back to temporaty queue: %v\n", err) if ch.IsClosed() { ch, err = connection.Channel() failOnError(err, "Channel cannot be re-opened") } q, err = ch.QueueDeclare("", false, true, true, false, nil) } } } failOnError(err, "Cannot declare queue") log.Printf("queue: %s:%d", q.Name, q.Consumers) err = ch.QueueBind(q.Name, "*.obs.*.*", "pubsub", false, nil) failOnError(err, "Cannot bind queue to exchange") msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil) failOnError(err, "Cannot start consumer") log.Printf("queue: %s:%d", q.Name, q.Consumers) for { msg, ok := <-msgs if !ok { log.Printf("channel/connection closed?\n") if connection.IsClosed() { // reconnect log.Printf("reconnecting...") time.Sleep(5 * time.Second) go ProcessingObsMessages(host, username, password, queueName) } return } processObsMessage(&msg) } }