.
This commit is contained in:
parent
001fdd4ebf
commit
6470387e47
19
obs-status-service/README.md
Normal file
19
obs-status-service/README.md
Normal file
@ -0,0 +1,19 @@
|
||||
OBS Status Service
|
||||
==================
|
||||
|
||||
Reports build status of OBS service as an easily to produce SVG
|
||||
|
||||
|
||||
Areas of Responsibility
|
||||
-----------------------
|
||||
|
||||
* Monitors RabbitMQ interface for notification of OBS package and project status
|
||||
* Produces SVG output based on GET request
|
||||
* Cache results (sqlite) and periodically update results from OBS (in case of messages are missing)
|
||||
|
||||
|
||||
Target Usage
|
||||
------------
|
||||
|
||||
* README.md of package git or project git
|
||||
* comment section of a Gitea PR
|
39
obs-status-service/go.mod
Normal file
39
obs-status-service/go.mod
Normal file
@ -0,0 +1,39 @@
|
||||
module src.opensuse.org/obs-status-service
|
||||
|
||||
go 1.22.3
|
||||
|
||||
replace src.opensuse.org/autogits/common => ../bots-common
|
||||
|
||||
require (
|
||||
github.com/mattn/go-sqlite3 v1.14.22
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
src.opensuse.org/autogits/common v0.0.0-00010101000000-000000000000
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-openapi/analysis v0.23.0 // indirect
|
||||
github.com/go-openapi/errors v0.22.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
github.com/go-openapi/jsonreference v0.21.0 // indirect
|
||||
github.com/go-openapi/loads v0.22.0 // indirect
|
||||
github.com/go-openapi/runtime v0.28.0 // indirect
|
||||
github.com/go-openapi/spec v0.21.0 // indirect
|
||||
github.com/go-openapi/strfmt v0.23.0 // indirect
|
||||
github.com/go-openapi/swag v0.23.0 // indirect
|
||||
github.com/go-openapi/validate v0.24.0 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/oklog/ulid v1.3.1 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
go.mongodb.org/mongo-driver v1.14.0 // indirect
|
||||
go.opentelemetry.io/otel v1.24.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.24.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.24.0 // indirect
|
||||
golang.org/x/sync v0.7.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
81
obs-status-service/go.sum
Normal file
81
obs-status-service/go.sum
Normal file
@ -0,0 +1,81 @@
|
||||
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
|
||||
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-openapi/analysis v0.23.0 h1:aGday7OWupfMs+LbmLZG4k0MYXIANxcuBTYUC03zFCU=
|
||||
github.com/go-openapi/analysis v0.23.0/go.mod h1:9mz9ZWaSlV8TvjQHLl2mUW2PbZtemkE8yA5v22ohupo=
|
||||
github.com/go-openapi/errors v0.22.0 h1:c4xY/OLxUBSTiepAg3j/MHuAv5mJhnf53LLMWFB+u/w=
|
||||
github.com/go-openapi/errors v0.22.0/go.mod h1:J3DmZScxCDufmIMsdOuDHxJbdOGC0xtUynjIx092vXE=
|
||||
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
|
||||
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
|
||||
github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ=
|
||||
github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4=
|
||||
github.com/go-openapi/loads v0.22.0 h1:ECPGd4jX1U6NApCGG1We+uEozOAvXvJSF4nnwHZ8Aco=
|
||||
github.com/go-openapi/loads v0.22.0/go.mod h1:yLsaTCS92mnSAZX5WWoxszLj0u+Ojl+Zs5Stn1oF+rs=
|
||||
github.com/go-openapi/runtime v0.28.0 h1:gpPPmWSNGo214l6n8hzdXYhPuJcGtziTOgUpvsFWGIQ=
|
||||
github.com/go-openapi/runtime v0.28.0/go.mod h1:QN7OzcS+XuYmkQLw05akXk0jRH/eZ3kb18+1KwW9gyc=
|
||||
github.com/go-openapi/spec v0.21.0 h1:LTVzPc3p/RzRnkQqLRndbAzjY0d0BCL72A6j3CdL9ZY=
|
||||
github.com/go-openapi/spec v0.21.0/go.mod h1:78u6VdPw81XU44qEWGhtr982gJ5BWg2c0I5XwVMotYk=
|
||||
github.com/go-openapi/strfmt v0.23.0 h1:nlUS6BCqcnAk0pyhi9Y+kdDVZdZMHfEKQiS4HaMgO/c=
|
||||
github.com/go-openapi/strfmt v0.23.0/go.mod h1:NrtIpfKtWIygRkKVsxh7XQMDQW5HKQl6S5ik2elW+K4=
|
||||
github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE=
|
||||
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
|
||||
github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58=
|
||||
github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
|
||||
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80=
|
||||
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
|
||||
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
|
||||
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
|
||||
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
|
||||
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
|
||||
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
|
||||
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
|
||||
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
|
||||
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
88
obs-status-service/main.go
Normal file
88
obs-status-service/main.go
Normal file
@ -0,0 +1,88 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"src.opensuse.org/autogits/common"
|
||||
)
|
||||
|
||||
const (
|
||||
ListenAddr = "[::1]:8003"
|
||||
AppName = "obs-status-service"
|
||||
)
|
||||
|
||||
type BuildStatusCacheItem struct {
|
||||
CacheTime time.Time
|
||||
Result []*common.BuildResult
|
||||
}
|
||||
|
||||
var obs *common.ObsClient
|
||||
var buildStatusCache map[string]BuildStatusCacheItem
|
||||
|
||||
func CacheBuildStatus(prj, pkg string) ([]common.BuildResult, error) {
|
||||
list, err := obs.BuildStatus(prj, pkg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func PackageBuildStatus(prj, pkg string) (common.ObsBuildStatusDetail, error) {
|
||||
for _, r := range list.Result {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func PackageStatusSvg(
|
||||
|
||||
func PackageStatusSummarySvg(buildStatus common.ObsBuildStatusDetail) []byte {
|
||||
fillColor := "orange"
|
||||
textColor := "grey"
|
||||
if buildStatus.Finished {
|
||||
textColor = "black"
|
||||
|
||||
if buildStatus.Success {
|
||||
fillColor = "green"
|
||||
} else {
|
||||
fillColor = "red"
|
||||
}
|
||||
}
|
||||
|
||||
return []byte(
|
||||
`<svg version="1.1"
|
||||
width="200" height="20"
|
||||
xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="100%" height="100%" fill="` + fillColor + `" />
|
||||
<text x="20" y="25" font-size="60" text-anchor="middle" fill="` + textColor + `">` + buildStatus.Code + `</text>
|
||||
</svg>`)
|
||||
}
|
||||
|
||||
func main() {
|
||||
common.RequireObsSecretToken()
|
||||
obsHost := os.Getenv("OBS_HOSTNAME")
|
||||
if len(obsHost) == 0 {
|
||||
log.Fatal("OBS_HOSTNAME env required.")
|
||||
}
|
||||
if obs, err := common.NewObsClient(obsHost); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
http.HandleFunc("GET /{ObsProject}", func(res http.ResponseWriter, req *http.Request) {
|
||||
res.WriteHeader(http.StatusBadRequest)
|
||||
})
|
||||
http.HandleFunc("GET /{ObsProject}/{Package}", func(res http.ResponseWriter, req *http.Request) {
|
||||
obsPrj := req.PathValue("ObsProject")
|
||||
obsPkg := req.PathValue("ObsPackage")
|
||||
|
||||
svg := PackageStatusSvg(PackageBuildStatus(obsPrj, obsPkg))
|
||||
|
||||
res.Header().Add("content-type", "image/svg+xml")
|
||||
res.Header().Add("size", fmt.Sprint(len(svg)))
|
||||
res.Write(svg)
|
||||
})
|
||||
|
||||
log.Fatal(http.ListenAndServe(ListenAddr, nil))
|
||||
}
|
184
obs-status-service/rabbitmq.go
Normal file
184
obs-status-service/rabbitmq.go
Normal file
@ -0,0 +1,184 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
rabbitmq "github.com/rabbitmq/amqp091-go"
|
||||
"src.opensuse.org/autogits/common"
|
||||
"src.opensuse.org/autogits/common/gitea-generated/models"
|
||||
)
|
||||
|
||||
type BuildNotification struct {
|
||||
BuildSuccess bool
|
||||
Project, Package, Repository, Arch, Release, Rev, Buildtype, Workerid string
|
||||
Starttime, Endtime, Readytime string
|
||||
}
|
||||
|
||||
var obsNotifications map[string]*BuildNotification // Project/Package/Repositry/Arch as key
|
||||
var notificationMutex sync.RWMutex
|
||||
var notificationChannels map[string][]chan *BuildNotification
|
||||
|
||||
func getProjectBuildStatus(project string) []*BuildNotification {
|
||||
notificationMutex.RLock()
|
||||
defer notificationMutex.RUnlock()
|
||||
|
||||
data := make([]*BuildNotification, 0, 4)
|
||||
for _, val := range obsNotifications {
|
||||
if val.Package == project {
|
||||
data = append(data, val)
|
||||
}
|
||||
}
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
func addProjectWatcher(meta *common.ProjectMeta, pr *models.PullReview) {
|
||||
}
|
||||
|
||||
func addObsNotificationToCache(notification *BuildNotification) {
|
||||
key := strings.Join(
|
||||
[]string{
|
||||
notification.Project,
|
||||
notification.Package,
|
||||
notification.Repository,
|
||||
notification.Arch,
|
||||
},
|
||||
"/",
|
||||
)
|
||||
|
||||
notificationMutex.Lock()
|
||||
obsNotifications[key] = notification
|
||||
|
||||
chns, ok := notificationChannels[notification.Project]
|
||||
notificationMutex.Unlock()
|
||||
if ok {
|
||||
for _, ch := range chns {
|
||||
ch <- notification
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processObsMessage(msg *rabbitmq.Delivery) {
|
||||
key := strings.SplitN(msg.RoutingKey, ".", 4)
|
||||
if len(key) != 4 || len(key[3]) < 7 || key[3][:6] != "build_" {
|
||||
return
|
||||
}
|
||||
|
||||
buildSuccess := false
|
||||
switch key[3][6:] {
|
||||
case "success", "unchanged":
|
||||
buildSuccess = true
|
||||
case "fail":
|
||||
buildSuccess = false
|
||||
default:
|
||||
log.Printf("unknown build_ logging message: %s\n", msg.RoutingKey)
|
||||
return
|
||||
}
|
||||
|
||||
notification := &BuildNotification{
|
||||
BuildSuccess: buildSuccess,
|
||||
}
|
||||
err := json.Unmarshal(msg.Body, notification)
|
||||
if err != nil {
|
||||
log.Printf("Cannot unmarshall json object: %s\n", msg.Body)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("%v\n", notification)
|
||||
addObsNotificationToCache(notification)
|
||||
}
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", err, msg)
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessingObsMessages(host, username, password, queueName string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Print("recovering... reconnecting...\n")
|
||||
time.Sleep(5 * time.Second)
|
||||
go ProcessingObsMessages(host, username, password, queueName)
|
||||
}
|
||||
}()
|
||||
|
||||
if obsNotifications == nil {
|
||||
obsNotifications = make(map[string]*BuildNotification)
|
||||
// notificationChannels = make(map[string]chan *BuildNotification)
|
||||
}
|
||||
|
||||
auth := ""
|
||||
if len(username) > 0 && len(password) > 0 {
|
||||
auth = username + ":" + password + "@"
|
||||
}
|
||||
|
||||
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
|
||||
ServerName: host,
|
||||
})
|
||||
failOnError(err, "Cannot connect to rabbit.opensuse.org")
|
||||
defer connection.Close()
|
||||
|
||||
ch, err := connection.Channel()
|
||||
failOnError(err, "Cannot create a channel")
|
||||
defer ch.Close()
|
||||
|
||||
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
|
||||
failOnError(err, "Cannot find pubsub exchange")
|
||||
|
||||
var q rabbitmq.Queue
|
||||
if len(queueName) == 0 {
|
||||
q, err = ch.QueueDeclare("", false, true, true, false, nil)
|
||||
} else {
|
||||
q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil)
|
||||
if err != nil {
|
||||
log.Printf("queue not found .. trying to create it: %v\n", err)
|
||||
if ch.IsClosed() {
|
||||
ch, err = connection.Channel()
|
||||
failOnError(err, "Channel cannot be re-opened")
|
||||
}
|
||||
q, err = ch.QueueDeclare(queueName, true, false, true, false, nil)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("can't create persistent queue ... falling back to temporaty queue: %v\n", err)
|
||||
if ch.IsClosed() {
|
||||
ch, err = connection.Channel()
|
||||
failOnError(err, "Channel cannot be re-opened")
|
||||
}
|
||||
q, err = ch.QueueDeclare("", false, true, true, false, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
failOnError(err, "Cannot declare queue")
|
||||
log.Printf("queue: %s:%d", q.Name, q.Consumers)
|
||||
|
||||
err = ch.QueueBind(q.Name, "*.obs.package.*", "pubsub", false, nil)
|
||||
failOnError(err, "Cannot bind queue to exchange")
|
||||
|
||||
msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
|
||||
failOnError(err, "Cannot start consumer")
|
||||
log.Printf("queue: %s:%d", q.Name, q.Consumers)
|
||||
|
||||
for {
|
||||
msg, ok := <-msgs
|
||||
if !ok {
|
||||
log.Printf("channel/connection closed?\n")
|
||||
|
||||
if connection.IsClosed() {
|
||||
// reconnect
|
||||
log.Printf("reconnecting...")
|
||||
time.Sleep(5 * time.Second)
|
||||
go ProcessingObsMessages(host, username, password, queueName)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
processObsMessage(&msg)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user