diff --git a/common/obs_utils.go b/common/obs_utils.go index 696541c..2242e36 100644 --- a/common/obs_utils.go +++ b/common/obs_utils.go @@ -583,18 +583,23 @@ type PackageBuildStatus struct { Package string `xml:"package,attr"` Code string `xml:"code,attr"` Details string `xml:"details"` + + LastUpdate int64 } type BuildResult struct { - Project string `xml:"project,attr"` - Repository string `xml:"repository,attr"` - Arch string `xml:"arch,attr"` - Code string `xml:"code,attr"` - Dirty bool `xml:"dirty,attr"` - ScmSync string `xml:"scmsync"` - ScmInfo string `xml:"scminfo"` - Status []PackageBuildStatus `xml:"status"` - Binaries []BinaryList `xml:"binarylist"` + Project string `xml:"project,attr"` + Repository string `xml:"repository,attr"` + Arch string `xml:"arch,attr"` + + Code string `xml:"code,attr"` + Dirty bool `xml:"dirty,attr"` + ScmSync string `xml:"scmsync"` + ScmInfo string `xml:"scminfo"` + Status []PackageBuildStatus `xml:"status"` + Binaries []BinaryList `xml:"binarylist"` + + LastUpdate int64 } type Binary struct { @@ -614,6 +619,7 @@ type BuildResultList struct { Result []BuildResult `xml:"result"` isLastBuild bool + LastUpdate int64 } func (r *BuildResultList) GetPackageList() []string { @@ -636,6 +642,48 @@ func (r *BuildResultList) GetPackageList() []string { return pkgList } +func packageSort(A, B PackageBuildStatus) int { + return strings.Compare(A.Package, B.Package) +} + +func repoSort(A, B BuildResult) int { + eq := strings.Compare(A.Project, B.Project) + if eq == 0 { + eq = strings.Compare(A.Repository, B.Repository) + if eq == 0 { + eq = strings.Compare(A.Arch, B.Arch) + } + } + return eq +} + +func (r *BuildResultList) MergePackageState(now int64, pkgState *BuildResultList) { + for _, nr := range pkgState.Result { + idx, found := slices.BinarySearchFunc(r.Result, nr, repoSort) + // not found, new repo? + if !found { + nr.LastUpdate = now + r.Result = slices.Insert(r.Result, idx, nr) + continue + } + + // update current repo + repo := &r.Result[idx] + + // update all the packages in the repo + for _, p := range nr.Status { + p.LastUpdate = now + idx, found := slices.BinarySearchFunc(repo.Status, p, packageSort) + if !found { + repo.Status = slices.Insert(repo.Status, idx, p) + continue + } + + repo.Status[idx] = p + } + } +} + func (r *BuildResultList) BuildResultSummary() (success, finished bool) { if r == nil { return false, false @@ -903,5 +951,11 @@ func (c *ObsClient) BuildStatusWithState(project string, opts *BuildResultOption if ret != nil { ret.isLastBuild = opts.LastBuild } + + slices.SortFunc(ret.Result, repoSort) + for _, r := range ret.Result { + slices.SortFunc(r.Status, packageSort) + } + return ret, err } diff --git a/common/rabbitmq_gitea.go b/common/rabbitmq_gitea.go index 4df1d0f..8f14bbe 100644 --- a/common/rabbitmq_gitea.go +++ b/common/rabbitmq_gitea.go @@ -25,28 +25,30 @@ import ( "strings" ) -const RequestType_CreateBrachTag = "create" -const RequestType_DeleteBranchTag = "delete" -const RequestType_Fork = "fork" -const RequestType_Issue = "issues" -const RequestType_IssueAssign = "issue_assign" -const RequestType_IssueComment = "issue_comment" -const RequestType_IssueLabel = "issue_label" -const RequestType_IssueMilestone = "issue_milestone" -const RequestType_Push = "push" -const RequestType_Repository = "repository" -const RequestType_Release = "release" -const RequestType_PR = "pull_request" -const RequestType_PRAssign = "pull_request_assign" -const RequestType_PRLabel = "pull_request_label" -const RequestType_PRComment = "pull_request_comment" -const RequestType_PRMilestone = "pull_request_milestone" -const RequestType_PRSync = "pull_request_sync" -const RequestType_PRReviewAccepted = "pull_request_review_approved" -const RequestType_PRReviewRejected = "pull_request_review_rejected" -const RequestType_PRReviewRequest = "pull_request_review_request" -const RequestType_PRReviewComment = "pull_request_review_comment" -const RequestType_Wiki = "wiki" +const ( + RequestType_CreateBrachTag = "create" + RequestType_DeleteBranchTag = "delete" + RequestType_Fork = "fork" + RequestType_Issue = "issues" + RequestType_IssueAssign = "issue_assign" + RequestType_IssueComment = "issue_comment" + RequestType_IssueLabel = "issue_label" + RequestType_IssueMilestone = "issue_milestone" + RequestType_Push = "push" + RequestType_Repository = "repository" + RequestType_Release = "release" + RequestType_PR = "pull_request" + RequestType_PRAssign = "pull_request_assign" + RequestType_PRLabel = "pull_request_label" + RequestType_PRComment = "pull_request_comment" + RequestType_PRMilestone = "pull_request_milestone" + RequestType_PRSync = "pull_request_sync" + RequestType_PRReviewAccepted = "pull_request_review_approved" + RequestType_PRReviewRejected = "pull_request_review_rejected" + RequestType_PRReviewRequest = "pull_request_review_request" + RequestType_PRReviewComment = "pull_request_review_comment" + RequestType_Wiki = "wiki" +) type RequestProcessor interface { ProcessFunc(*Request) error diff --git a/common/rabbitmq_obs.go b/common/rabbitmq_obs.go index 69b5825..e5aca97 100644 --- a/common/rabbitmq_obs.go +++ b/common/rabbitmq_obs.go @@ -1,10 +1,99 @@ package common +import ( + "encoding/json" + "errors" + "fmt" + "slices" + "strings" +) + +const ( + ObsMessageType_PackageBuildFail = "package.build_fail" + ObsMessageType_PackageBuildSuccess = "package.build_success" + ObsMessageType_PackageBuildUnchanged = "package.build_unchanged" + + ObsMessageType_RepoBuildFinished = "repo.build_finished" + ObsMessageType_RepoBuildStarted = "repo.build_started" +) + +type BuildResultMsg struct { + Status string + Project string `json:"project"` + Package string `json:"package"` + Repo string `json:"repository"` + Arch string `json:"arch"` + + StartTime int32 `json:"starttime"` + EndTime int32 `json:"endtime"` + WorkerID string `json:"workerid"` + Version string `json:"versrel"` + Build string `json:"buildtype"` +} + +type RepoBuildMsg struct { + Status string + Project string `json:"project"` + Repo string `json:"repo"` + Arch string `json:"arch"` + BuildId string `json:"buildid"` +} + +var ObsRabbitMessageError_UnknownMessageType error = errors.New("Unknown message type") +var ObsRabbitMessageError_ParseError error = errors.New("JSON parsing error") + +func ParseObsRabbitMessaege(ObsMessageType string, data []byte) (interface{}, error) { + unmarshall := func(data []byte, v any) (interface{}, error) { + if err := json.Unmarshal(data, v); err != nil { + return nil, fmt.Errorf("%w: %s", ObsRabbitMessageError_ParseError, err) + } + return v, nil + } + + switch ObsMessageType { + case ObsMessageType_PackageBuildSuccess, ObsMessageType_PackageBuildUnchanged: + ret := &BuildResultMsg{Status: "succeeded"} + return unmarshall(data, ret) + case ObsMessageType_PackageBuildFail: + ret := &BuildResultMsg{Status: "failed"} + return unmarshall(data, ret) + case ObsMessageType_RepoBuildFinished: + ret := &RepoBuildMsg{Status: "finished"} + return unmarshall(data, ret) + case ObsMessageType_RepoBuildStarted: + ret := &RepoBuildMsg{Status: "building"} + return unmarshall(data, ret) + } + + return nil, fmt.Errorf("%w: %s", ObsRabbitMessageError_UnknownMessageType, ObsMessageType) +} + +type ObsMessageProcessor func(topic string, data []byte) error + type RabbitMQObsBuildStatusProcessor struct { + Handlers map[string]ObsMessageProcessor + c *RabbitConnection } +func (o *RabbitMQObsBuildStatusProcessor) routingKeyPrefix() string { + if strings.HasSuffix(o.c.RabbitURL.Hostname(), "opensuse.org") { + return "opensuse" + } + + return "suse" +} + func (o *RabbitMQObsBuildStatusProcessor) GenerateTopics() []string { + prefix := o.routingKeyPrefix() + msgs := make([]string, len(o.Handlers)) + idx := 0 + for k, _ := range o.Handlers { + msgs[idx] = prefix + ".obs." + k + idx++ + } + slices.Sort(msgs) + return msgs } func (o *RabbitMQObsBuildStatusProcessor) Connection() *RabbitConnection { @@ -16,5 +105,11 @@ func (o *RabbitMQObsBuildStatusProcessor) Connection() *RabbitConnection { } func (o *RabbitMQObsBuildStatusProcessor) ProcessRabbitMessage(msg RabbitMessage) error { -} + prefix := o.routingKeyPrefix() + ".obs." + topic := strings.TrimPrefix(msg.RoutingKey, prefix) + if h, ok := o.Handlers[topic]; ok { + return h(topic, msg.Body) + } + return fmt.Errorf("Unhandled message received: %s", msg.RoutingKey) +} diff --git a/common/utils.go b/common/utils.go index 08fb05b..88abf1d 100644 --- a/common/utils.go +++ b/common/utils.go @@ -173,4 +173,3 @@ func (d DevelProjects) GetDevelProject(pkg string) (string, error) { return "", DevelProjectNotFound } - diff --git a/obs-status-service/main.go b/obs-status-service/main.go index b0b65e0..781e485 100644 --- a/obs-status-service/main.go +++ b/obs-status-service/main.go @@ -115,7 +115,6 @@ func main() { Topic := flag.String("topic", "opensuse.obs", "RabbitMQ topic prefix") flag.Parse() - common.PanicOnError(common.RequireObsSecretToken()) var err error @@ -156,7 +155,7 @@ func main() { repo := req.PathValue("Repository") arch := req.PathValue("Arch") -// status := GetDetailedBuildStatus(prj, pkg, repo, arch) + // status := GetDetailedBuildStatus(prj, pkg, repo, arch) data, err := obs.BuildLog(prj, pkg, repo, arch) if err != nil { res.WriteHeader(http.StatusInternalServerError) @@ -164,7 +163,7 @@ func main() { return } defer data.Close() - + io.Copy(res, data) }) diff --git a/obs-status-service/rabbit.go b/obs-status-service/rabbit.go new file mode 100644 index 0000000..d3ca313 --- /dev/null +++ b/obs-status-service/rabbit.go @@ -0,0 +1,3 @@ +package main + + diff --git a/obs-status-service/status.go b/obs-status-service/status.go index 5d097e9..0cb1be7 100644 --- a/obs-status-service/status.go +++ b/obs-status-service/status.go @@ -1,8 +1,8 @@ package main import ( - "log" "slices" + "strings" "sync" "time" @@ -24,14 +24,67 @@ type StatusUpdateMsg struct { func GetCurrentStatus(project string) *common.BuildResultList { statusMutex.RLock() - defer statusMutex.RUnlock() if ret, found := CurrentStatus[project]; found { + statusMutex.RUnlock() return ret - } else { - go WatchObsProject(obs, project) + } + + res, err := obs.BuildStatus(project) + statusMutex.RUnlock() + statusMutex.Lock() + defer statusMutex.Unlock() + + if err != nil { + return res + } + CurrentStatus[project] = res + + now := time.Now().Unix() + CurrentStatus[project].LastUpdate = now + for _, r := range res.Result { + r.LastUpdate = now + for _, p := range r.Status { + p.LastUpdate = now + } + slices.SortFunc(r.Status, packageSort) + } + slices.SortFunc(res.Result, repoSort) + return res +} + + +func updatePrjPackage(prjState *common.BuildResultList, pkg string, now int64, pkgState *common.BuildResultList) { + for prjState. + Result[0].Status[0].Package +} + +func extractPackageBuildStatus(prjState *common.BuildResultList, pkg string) []*common.PackageBuildStatus { + +} + +func GetDetailedPackageBuildStatus(prj, pkg string) []*common.PackageBuildStatus { + statusMutex.RLock() + now := time.Now().Unix() + + cachedPrj, found := CurrentStatus[prj] + if found { + statusMutex.Unlock() + if now-cachedPrj.LastUpdate < 60 { + return extractPackageBuildStatus(cachedPrj, pkg) + } + } + + ret, err := obs.BuildStatus(prj, pkg) + if err != nil { return nil } + + statusMutex.Lock() + defer statusMutex.Unlock() + + updatePrjPackage(cachedPrj, pkg, now, ret) + return extractPackageBuildStatus(cachedPrj, pkg) } func GetDetailedBuildStatus(prj, pkg, repo, arch string) *common.PackageBuildStatus { @@ -72,30 +125,3 @@ func ProcessUpdates() { } } } - -func WatchObsProject(obs common.ObsStatusFetcherWithState, ObsProject string) { - old_state := "" - - mutex.Lock() - if pos, found := slices.BinarySearch(WatchedRepos, ObsProject); found { - mutex.Unlock() - return - } else { - WatchedRepos = slices.Insert(WatchedRepos, pos, ObsProject) - mutex.Unlock() - } - - LogDebug("+ watching", ObsProject) - opts := common.BuildResultOptions{} - for { - state, err := obs.BuildStatusWithState(ObsProject, &opts) - if err != nil { - log.Println(" *** Error fetching build for", ObsProject, err) - time.Sleep(time.Minute) - } else { - opts.OldState = state.State - LogDebug(" --> update", ObsProject, " => ", old_state) - StatusUpdateCh <- StatusUpdateMsg{ObsProject: ObsProject, Result: state} - } - } -}