1 Commits

Author SHA256 Message Date
e8b6066bae wip 2025-07-27 23:11:42 +02:00
7 changed files with 245 additions and 67 deletions

View File

@@ -583,18 +583,23 @@ type PackageBuildStatus struct {
Package string `xml:"package,attr"`
Code string `xml:"code,attr"`
Details string `xml:"details"`
LastUpdate int64
}
type BuildResult struct {
Project string `xml:"project,attr"`
Repository string `xml:"repository,attr"`
Arch string `xml:"arch,attr"`
Code string `xml:"code,attr"`
Dirty bool `xml:"dirty,attr"`
ScmSync string `xml:"scmsync"`
ScmInfo string `xml:"scminfo"`
Status []PackageBuildStatus `xml:"status"`
Binaries []BinaryList `xml:"binarylist"`
Project string `xml:"project,attr"`
Repository string `xml:"repository,attr"`
Arch string `xml:"arch,attr"`
Code string `xml:"code,attr"`
Dirty bool `xml:"dirty,attr"`
ScmSync string `xml:"scmsync"`
ScmInfo string `xml:"scminfo"`
Status []PackageBuildStatus `xml:"status"`
Binaries []BinaryList `xml:"binarylist"`
LastUpdate int64
}
type Binary struct {
@@ -614,6 +619,7 @@ type BuildResultList struct {
Result []BuildResult `xml:"result"`
isLastBuild bool
LastUpdate int64
}
func (r *BuildResultList) GetPackageList() []string {
@@ -636,6 +642,48 @@ func (r *BuildResultList) GetPackageList() []string {
return pkgList
}
func packageSort(A, B PackageBuildStatus) int {
return strings.Compare(A.Package, B.Package)
}
func repoSort(A, B BuildResult) int {
eq := strings.Compare(A.Project, B.Project)
if eq == 0 {
eq = strings.Compare(A.Repository, B.Repository)
if eq == 0 {
eq = strings.Compare(A.Arch, B.Arch)
}
}
return eq
}
func (r *BuildResultList) MergePackageState(now int64, pkgState *BuildResultList) {
for _, nr := range pkgState.Result {
idx, found := slices.BinarySearchFunc(r.Result, nr, repoSort)
// not found, new repo?
if !found {
nr.LastUpdate = now
r.Result = slices.Insert(r.Result, idx, nr)
continue
}
// update current repo
repo := &r.Result[idx]
// update all the packages in the repo
for _, p := range nr.Status {
p.LastUpdate = now
idx, found := slices.BinarySearchFunc(repo.Status, p, packageSort)
if !found {
repo.Status = slices.Insert(repo.Status, idx, p)
continue
}
repo.Status[idx] = p
}
}
}
func (r *BuildResultList) BuildResultSummary() (success, finished bool) {
if r == nil {
return false, false
@@ -903,5 +951,11 @@ func (c *ObsClient) BuildStatusWithState(project string, opts *BuildResultOption
if ret != nil {
ret.isLastBuild = opts.LastBuild
}
slices.SortFunc(ret.Result, repoSort)
for _, r := range ret.Result {
slices.SortFunc(r.Status, packageSort)
}
return ret, err
}

View File

@@ -25,28 +25,30 @@ import (
"strings"
)
const RequestType_CreateBrachTag = "create"
const RequestType_DeleteBranchTag = "delete"
const RequestType_Fork = "fork"
const RequestType_Issue = "issues"
const RequestType_IssueAssign = "issue_assign"
const RequestType_IssueComment = "issue_comment"
const RequestType_IssueLabel = "issue_label"
const RequestType_IssueMilestone = "issue_milestone"
const RequestType_Push = "push"
const RequestType_Repository = "repository"
const RequestType_Release = "release"
const RequestType_PR = "pull_request"
const RequestType_PRAssign = "pull_request_assign"
const RequestType_PRLabel = "pull_request_label"
const RequestType_PRComment = "pull_request_comment"
const RequestType_PRMilestone = "pull_request_milestone"
const RequestType_PRSync = "pull_request_sync"
const RequestType_PRReviewAccepted = "pull_request_review_approved"
const RequestType_PRReviewRejected = "pull_request_review_rejected"
const RequestType_PRReviewRequest = "pull_request_review_request"
const RequestType_PRReviewComment = "pull_request_review_comment"
const RequestType_Wiki = "wiki"
const (
RequestType_CreateBrachTag = "create"
RequestType_DeleteBranchTag = "delete"
RequestType_Fork = "fork"
RequestType_Issue = "issues"
RequestType_IssueAssign = "issue_assign"
RequestType_IssueComment = "issue_comment"
RequestType_IssueLabel = "issue_label"
RequestType_IssueMilestone = "issue_milestone"
RequestType_Push = "push"
RequestType_Repository = "repository"
RequestType_Release = "release"
RequestType_PR = "pull_request"
RequestType_PRAssign = "pull_request_assign"
RequestType_PRLabel = "pull_request_label"
RequestType_PRComment = "pull_request_comment"
RequestType_PRMilestone = "pull_request_milestone"
RequestType_PRSync = "pull_request_sync"
RequestType_PRReviewAccepted = "pull_request_review_approved"
RequestType_PRReviewRejected = "pull_request_review_rejected"
RequestType_PRReviewRequest = "pull_request_review_request"
RequestType_PRReviewComment = "pull_request_review_comment"
RequestType_Wiki = "wiki"
)
type RequestProcessor interface {
ProcessFunc(*Request) error

View File

@@ -1,10 +1,99 @@
package common
import (
"encoding/json"
"errors"
"fmt"
"slices"
"strings"
)
const (
ObsMessageType_PackageBuildFail = "package.build_fail"
ObsMessageType_PackageBuildSuccess = "package.build_success"
ObsMessageType_PackageBuildUnchanged = "package.build_unchanged"
ObsMessageType_RepoBuildFinished = "repo.build_finished"
ObsMessageType_RepoBuildStarted = "repo.build_started"
)
type BuildResultMsg struct {
Status string
Project string `json:"project"`
Package string `json:"package"`
Repo string `json:"repository"`
Arch string `json:"arch"`
StartTime int32 `json:"starttime"`
EndTime int32 `json:"endtime"`
WorkerID string `json:"workerid"`
Version string `json:"versrel"`
Build string `json:"buildtype"`
}
type RepoBuildMsg struct {
Status string
Project string `json:"project"`
Repo string `json:"repo"`
Arch string `json:"arch"`
BuildId string `json:"buildid"`
}
var ObsRabbitMessageError_UnknownMessageType error = errors.New("Unknown message type")
var ObsRabbitMessageError_ParseError error = errors.New("JSON parsing error")
func ParseObsRabbitMessaege(ObsMessageType string, data []byte) (interface{}, error) {
unmarshall := func(data []byte, v any) (interface{}, error) {
if err := json.Unmarshal(data, v); err != nil {
return nil, fmt.Errorf("%w: %s", ObsRabbitMessageError_ParseError, err)
}
return v, nil
}
switch ObsMessageType {
case ObsMessageType_PackageBuildSuccess, ObsMessageType_PackageBuildUnchanged:
ret := &BuildResultMsg{Status: "succeeded"}
return unmarshall(data, ret)
case ObsMessageType_PackageBuildFail:
ret := &BuildResultMsg{Status: "failed"}
return unmarshall(data, ret)
case ObsMessageType_RepoBuildFinished:
ret := &RepoBuildMsg{Status: "finished"}
return unmarshall(data, ret)
case ObsMessageType_RepoBuildStarted:
ret := &RepoBuildMsg{Status: "building"}
return unmarshall(data, ret)
}
return nil, fmt.Errorf("%w: %s", ObsRabbitMessageError_UnknownMessageType, ObsMessageType)
}
type ObsMessageProcessor func(topic string, data []byte) error
type RabbitMQObsBuildStatusProcessor struct {
Handlers map[string]ObsMessageProcessor
c *RabbitConnection
}
func (o *RabbitMQObsBuildStatusProcessor) routingKeyPrefix() string {
if strings.HasSuffix(o.c.RabbitURL.Hostname(), "opensuse.org") {
return "opensuse"
}
return "suse"
}
func (o *RabbitMQObsBuildStatusProcessor) GenerateTopics() []string {
prefix := o.routingKeyPrefix()
msgs := make([]string, len(o.Handlers))
idx := 0
for k, _ := range o.Handlers {
msgs[idx] = prefix + ".obs." + k
idx++
}
slices.Sort(msgs)
return msgs
}
func (o *RabbitMQObsBuildStatusProcessor) Connection() *RabbitConnection {
@@ -16,5 +105,11 @@ func (o *RabbitMQObsBuildStatusProcessor) Connection() *RabbitConnection {
}
func (o *RabbitMQObsBuildStatusProcessor) ProcessRabbitMessage(msg RabbitMessage) error {
}
prefix := o.routingKeyPrefix() + ".obs."
topic := strings.TrimPrefix(msg.RoutingKey, prefix)
if h, ok := o.Handlers[topic]; ok {
return h(topic, msg.Body)
}
return fmt.Errorf("Unhandled message received: %s", msg.RoutingKey)
}

View File

@@ -173,4 +173,3 @@ func (d DevelProjects) GetDevelProject(pkg string) (string, error) {
return "", DevelProjectNotFound
}

View File

@@ -115,7 +115,6 @@ func main() {
Topic := flag.String("topic", "opensuse.obs", "RabbitMQ topic prefix")
flag.Parse()
common.PanicOnError(common.RequireObsSecretToken())
var err error
@@ -156,7 +155,7 @@ func main() {
repo := req.PathValue("Repository")
arch := req.PathValue("Arch")
// status := GetDetailedBuildStatus(prj, pkg, repo, arch)
// status := GetDetailedBuildStatus(prj, pkg, repo, arch)
data, err := obs.BuildLog(prj, pkg, repo, arch)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
@@ -164,7 +163,7 @@ func main() {
return
}
defer data.Close()
io.Copy(res, data)
})

View File

@@ -0,0 +1,3 @@
package main

View File

@@ -1,8 +1,8 @@
package main
import (
"log"
"slices"
"strings"
"sync"
"time"
@@ -24,14 +24,67 @@ type StatusUpdateMsg struct {
func GetCurrentStatus(project string) *common.BuildResultList {
statusMutex.RLock()
defer statusMutex.RUnlock()
if ret, found := CurrentStatus[project]; found {
statusMutex.RUnlock()
return ret
} else {
go WatchObsProject(obs, project)
}
res, err := obs.BuildStatus(project)
statusMutex.RUnlock()
statusMutex.Lock()
defer statusMutex.Unlock()
if err != nil {
return res
}
CurrentStatus[project] = res
now := time.Now().Unix()
CurrentStatus[project].LastUpdate = now
for _, r := range res.Result {
r.LastUpdate = now
for _, p := range r.Status {
p.LastUpdate = now
}
slices.SortFunc(r.Status, packageSort)
}
slices.SortFunc(res.Result, repoSort)
return res
}
func updatePrjPackage(prjState *common.BuildResultList, pkg string, now int64, pkgState *common.BuildResultList) {
for prjState.
Result[0].Status[0].Package
}
func extractPackageBuildStatus(prjState *common.BuildResultList, pkg string) []*common.PackageBuildStatus {
}
func GetDetailedPackageBuildStatus(prj, pkg string) []*common.PackageBuildStatus {
statusMutex.RLock()
now := time.Now().Unix()
cachedPrj, found := CurrentStatus[prj]
if found {
statusMutex.Unlock()
if now-cachedPrj.LastUpdate < 60 {
return extractPackageBuildStatus(cachedPrj, pkg)
}
}
ret, err := obs.BuildStatus(prj, pkg)
if err != nil {
return nil
}
statusMutex.Lock()
defer statusMutex.Unlock()
updatePrjPackage(cachedPrj, pkg, now, ret)
return extractPackageBuildStatus(cachedPrj, pkg)
}
func GetDetailedBuildStatus(prj, pkg, repo, arch string) *common.PackageBuildStatus {
@@ -72,30 +125,3 @@ func ProcessUpdates() {
}
}
}
func WatchObsProject(obs common.ObsStatusFetcherWithState, ObsProject string) {
old_state := ""
mutex.Lock()
if pos, found := slices.BinarySearch(WatchedRepos, ObsProject); found {
mutex.Unlock()
return
} else {
WatchedRepos = slices.Insert(WatchedRepos, pos, ObsProject)
mutex.Unlock()
}
LogDebug("+ watching", ObsProject)
opts := common.BuildResultOptions{}
for {
state, err := obs.BuildStatusWithState(ObsProject, &opts)
if err != nil {
log.Println(" *** Error fetching build for", ObsProject, err)
time.Sleep(time.Minute)
} else {
opts.OldState = state.State
LogDebug(" --> update", ObsProject, " => ", old_state)
StatusUpdateCh <- StatusUpdateMsg{ObsProject: ObsProject, Result: state}
}
}
}