From 7dfbf4c9f2c836209b250e6fb27dc1ff1d5161c398a48eded715da51ffc7713c Mon Sep 17 00:00:00 2001 From: Adam Majer Date: Thu, 18 Jul 2024 16:43:27 +0200 Subject: [PATCH] . --- bots-common/gitea_utils.go | 64 +++++++++++++++++++++++++++++++++++-- obs-staging-bot/Makefile | 11 +------ obs-staging-bot/go.mod | 8 +++-- obs-staging-bot/go.sum | 2 ++ obs-staging-bot/main.go | 22 ++++++++++++- obs-staging-bot/rabbitmq.go | 43 ++++++++++++++++++------- 6 files changed, 123 insertions(+), 27 deletions(-) diff --git a/bots-common/gitea_utils.go b/bots-common/gitea_utils.go index b32fee6..b43a5ec 100644 --- a/bots-common/gitea_utils.go +++ b/bots-common/gitea_utils.go @@ -7,9 +7,12 @@ import ( "os" "path/filepath" "strings" + "time" transport "github.com/go-openapi/runtime/client" + "github.com/go-openapi/strfmt" apiclient "src.opensuse.org/autogits/common/gitea-generated/client" + "src.opensuse.org/autogits/common/gitea-generated/client/notification" "src.opensuse.org/autogits/common/gitea-generated/client/organization" "src.opensuse.org/autogits/common/gitea-generated/client/repository" "src.opensuse.org/autogits/common/gitea-generated/models" @@ -25,6 +28,63 @@ func (h *RequestHandler) allocateGiteaTransport() (*transport.Runtime, *apiclien return r, apiclient.New(r, nil) } +func (h *RequestHandler) GetNotifications(since *time.Time) ([]*models.NotificationThread, error){ + if h.HasError() { + return nil, h.Error + } + + bigLimit := int64(100000) + transport, client := h.allocateGiteaTransport() + + params := notification.NewNotifyGetListParams(). + WithDefaults(). + WithLimit(&bigLimit) + + if since != nil { + s := strfmt.DateTime(*since) + params.SetSince(&s) + } + + list, err := client.Notification.NotifyGetList(params, transport.DefaultAuthentication) + if err != nil { + h.Error = err + return nil, err + } + + if !list.IsSuccess() { + h.Error = fmt.Errorf("Cannot fetch notifications: %s", list.Error()) + return nil, h.Error + } + + return list.Payload, nil +} + +func (h *RequestHandler) SetNotificationRead(notificationId int) error { + if h.HasError() { + return h.Error + } + + transport, client := h.allocateGiteaTransport() + list, err := client.Notification.NotifyReadThread( + notification.NewNotifyReadThreadParams(). + WithDefaults(). + WithID(string(id)), + transport.DefaultAuthentication, + ) + + if err != nil { + h.Error = err + return err + } + + if !list.IsSuccess() { + h.Error = fmt.Errorf("Cannot update notifications: %d", notificationId) + return h.Error + } + + return nil +} + func (h *RequestHandler) CreateRepositoryIfNotExist(org Organization, repoName string) *models.Repository { if h.HasError() { return nil @@ -177,7 +237,7 @@ func (h *RequestHandler) GetAssociatedPrjGitPR(pr *PullRequestAction) *models.Pu transport, client := h.allocateGiteaTransport() - var page, maxSize int64; + var page, maxSize int64 page = 1 maxSize = 10000 state := "open" @@ -204,7 +264,7 @@ func (h *RequestHandler) GetAssociatedPrjGitPR(pr *PullRequestAction) *models.Pu prLine := fmt.Sprintf(PrPattern, pr.Repository.Owner.Username, pr.Repository.Name, pr.Number) h.Log("attemping to match line: '%s'", prLine) -// payload_processing: + // payload_processing: for _, pr := range prs.Payload { lines := strings.Split(pr.Body, "\n") diff --git a/obs-staging-bot/Makefile b/obs-staging-bot/Makefile index 99b8b0e..5bd3e7f 100644 --- a/obs-staging-bot/Makefile +++ b/obs-staging-bot/Makefile @@ -1,14 +1,5 @@ all: build -api.json: - curl -o api.json https://src.opensuse.org/swagger.v1.json - -generated/client/gitea_api_client.go: api.json - [ -d generated ] || mkdir generated - podman run --rm -v $$(pwd):/api ghcr.io/go-swagger/go-swagger generate client -f /api/api.json -t /api/generated - -api: generated/client/gitea_api_client.go - -build: api +build: go build -tags "libsqlite3 linux" diff --git a/obs-staging-bot/go.mod b/obs-staging-bot/go.mod index 0f56b33..f58ddb8 100644 --- a/obs-staging-bot/go.mod +++ b/obs-staging-bot/go.mod @@ -4,7 +4,11 @@ go 1.22.3 replace src.opensuse.org/autogits/common => ../bots-common -require src.opensuse.org/autogits/common v0.0.0-00010101000000-000000000000 +require ( + github.com/mattn/go-sqlite3 v1.14.22 + github.com/rabbitmq/amqp091-go v1.10.0 + src.opensuse.org/autogits/common v0.0.0-00010101000000-000000000000 +) require ( github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect @@ -23,11 +27,9 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect - github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/rabbitmq/amqp091-go v1.10.0 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect diff --git a/obs-staging-bot/go.sum b/obs-staging-bot/go.sum index 53cbf1b..86afe29 100644 --- a/obs-staging-bot/go.sum +++ b/obs-staging-bot/go.sum @@ -68,6 +68,8 @@ go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucg go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= diff --git a/obs-staging-bot/main.go b/obs-staging-bot/main.go index 7c60433..09a0d0a 100644 --- a/obs-staging-bot/main.go +++ b/obs-staging-bot/main.go @@ -27,6 +27,24 @@ func allocateRequestHandler() *common.RequestHandler { } } +func PollWorkNotifications() { + h := allocateRequestHandler() + data, err := h.GetNotifications(nil) + + if err != nil { + h.LogPlainError(err) + return + } + + if data != nil { + for _, notification := range data { + h.Log("[+] id: %d", notification.ID) + h.Log(" title: %s", notification.Subject.Title) + h.Log(" subj: %#v", notification.Subject) + } + } +} + func main() { var defs common.ListenDefinitions @@ -36,7 +54,9 @@ func main() { common.RequireGiteaSecretToken() common.RequireObsSecretToken() - go startProcessingObsMessages("rabbit.opensuse.org", "opensuse", "opensuse") +// go ProcessingObsMessages("rabbit.opensuse.org", "opensuse", "opensuse", "") + + PollWorkNotifications() stuck := make(chan int) <-stuck diff --git a/obs-staging-bot/rabbitmq.go b/obs-staging-bot/rabbitmq.go index 7942e9d..9a8a681 100644 --- a/obs-staging-bot/rabbitmq.go +++ b/obs-staging-bot/rabbitmq.go @@ -53,10 +53,15 @@ func processObsMessage(msg *rabbitmq.Delivery) { } -func buildHistoryStore() { -} +func ProcessingObsMessages(host, username, password, queueName string) { + defer func() { + if r := recover(); r != nil { + log.Print("recovering... reconnecting...\n") + time.Sleep(5 * time.Second) + go ProcessingObsMessages(host, username, password, queueName) + } + }() -func startProcessingObsMessages(host, username, password string) { auth := "" if len(username) > 0 && len(password) > 0 { auth = username + ":" + password + "@" @@ -75,12 +80,29 @@ func startProcessingObsMessages(host, username, password string) { err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil) failOnError(err, "Cannot find pubsub exchange") - q, err := ch.QueueDeclarePassive("obs-staging-bot", true, false, true, false, nil) - if err != nil { - log.Printf("queue not found .. trying to create it\n") - q, err = ch.QueueDeclare("obs-staging-bot", true, false, true, false, nil) + var q rabbitmq.Queue + if len(queueName) == 0 { + q, err = ch.QueueDeclare("", false, true, true, false, nil) + } else { + q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil) + if err != nil { + log.Printf("queue not found .. trying to create it: %v\n", err) + if ch.IsClosed() { + ch, err = connection.Channel() + failOnError(err, "Channel cannot be re-opened") + } + q, err = ch.QueueDeclare(queueName, true, false, true, false, nil) + + if err != nil { + log.Printf("can't create persistent queue ... falling back to temporaty queue: %v\n", err) + if ch.IsClosed() { + ch, err = connection.Channel() + failOnError(err, "Channel cannot be re-opened") + } + q, err = ch.QueueDeclare("", false, true, true, false, nil) + } + } } - //q, err := ch.QueueDeclare("obs-staging-bot", false, true, false, false, nil) failOnError(err, "Cannot declare queue") log.Printf("queue: %s:%d", q.Name, q.Consumers) @@ -91,17 +113,16 @@ func startProcessingObsMessages(host, username, password string) { failOnError(err, "Cannot start consumer") log.Printf("queue: %s:%d", q.Name, q.Consumers) - // TODO: add recovery for NotifyClose and related for { msg, ok := <-msgs if !ok { - log.Printf("channel closed?\n") + log.Printf("channel/connection closed?\n") if connection.IsClosed() { // reconnect log.Printf("reconnecting...") time.Sleep(5 * time.Second) - go startProcessingObsMessages(host, username, password) + go ProcessingObsMessages(host, username, password, queueName) } return }