SHA256
1
0

Compare commits

..

2 Commits

Author SHA256 Message Date
Andrii Nikitin
eb62492046 pr: add parameter exit-on-config-error
The parameter will help to deal with test fixture concurrency
2026-03-06 14:18:42 +01:00
ba20810c99 common: parser error fix resulting in deadlock 2026-03-05 13:25:51 +01:00
6 changed files with 264 additions and 62 deletions

View File

@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"slices"
"strings"
@@ -205,16 +204,21 @@ func ReadWorkflowConfig(gitea GiteaFileContentAndRepoFetcher, git_project string
func ResolveWorkflowConfigs(gitea GiteaFileContentAndRepoFetcher, config *ConfigFile) (AutogitConfigs, error) {
configs := make([]*AutogitConfig, 0, len(config.GitProjectNames))
var errs []error
for _, git_project := range config.GitProjectNames {
c, err := ReadWorkflowConfig(gitea, git_project)
if err != nil {
// can't sync, so ignore for now
log.Println(err)
errs = append(errs, err)
} else {
configs = append(configs, c)
}
}
if len(errs) > 0 {
return configs, errors.Join(errs...)
}
return configs, nil
}

View File

@@ -396,12 +396,17 @@ func (e *GitHandlerImpl) GitExecQuietOrPanic(cwd string, params ...string) {
}
type ChanIO struct {
ch chan byte
ch chan byte
done chan struct{}
}
func (c *ChanIO) Write(p []byte) (int, error) {
for _, b := range p {
c.ch <- b
select {
case c.ch <- b:
case <-c.done:
return 0, io.EOF
}
}
return len(p), nil
}
@@ -410,21 +415,32 @@ func (c *ChanIO) Write(p []byte) (int, error) {
func (c *ChanIO) Read(data []byte) (idx int, err error) {
var ok bool
data[idx], ok = <-c.ch
if !ok {
err = io.EOF
return
}
idx++
for len(c.ch) > 0 && idx < len(data) {
data[idx], ok = <-c.ch
select {
case data[idx], ok = <-c.ch:
if !ok {
err = io.EOF
return
}
idx++
case <-c.done:
err = io.EOF
return
}
for len(c.ch) > 0 && idx < len(data) {
select {
case data[idx], ok = <-c.ch:
if !ok {
err = io.EOF
return
}
idx++
case <-c.done:
err = io.EOF
return
default:
return
}
}
return
@@ -471,7 +487,14 @@ func parseGitMsg(data <-chan byte) (GitMsg, error) {
var size int
pos := 0
for c := <-data; c != ' '; c = <-data {
for {
c, ok := <-data
if !ok {
return GitMsg{}, io.EOF
}
if c == ' ' {
break
}
if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') {
id[pos] = c
pos++
@@ -483,7 +506,15 @@ func parseGitMsg(data <-chan byte) (GitMsg, error) {
pos = 0
var c byte
for c = <-data; c != ' ' && c != '\x00'; c = <-data {
for {
var ok bool
c, ok = <-data
if !ok {
return GitMsg{}, io.EOF
}
if c == ' ' || c == '\x00' {
break
}
if c >= 'a' && c <= 'z' {
msgType[pos] = c
pos++
@@ -509,7 +540,14 @@ func parseGitMsg(data <-chan byte) (GitMsg, error) {
return GitMsg{}, fmt.Errorf("Invalid object type: '%s'", string(msgType))
}
for c = <-data; c != '\000'; c = <-data {
for {
c, ok := <-data
if !ok {
return GitMsg{}, io.EOF
}
if c == '\x00' {
break
}
if c >= '0' && c <= '9' {
size = size*10 + (int(c) - '0')
} else {
@@ -528,18 +566,37 @@ func parseGitCommitHdr(oldHdr [2]string, data <-chan byte) ([2]string, int, erro
hdr := make([]byte, 0, 60)
val := make([]byte, 0, 1000)
c := <-data
c, ok := <-data
if !ok {
return [2]string{}, 0, io.EOF
}
size := 1
if c != '\n' { // end of header marker
for ; c != ' '; c = <-data {
for {
if c == ' ' {
break
}
hdr = append(hdr, c)
size++
var ok bool
c, ok = <-data
if !ok {
return [2]string{}, size, io.EOF
}
}
if size == 1 { // continuation header here
hdr = []byte(oldHdr[0])
val = append([]byte(oldHdr[1]), '\n')
}
for c := <-data; c != '\n'; c = <-data {
for {
var ok bool
c, ok = <-data
if !ok {
return [2]string{}, size, io.EOF
}
if c == '\n' {
break
}
val = append(val, c)
size++
}
@@ -552,7 +609,14 @@ func parseGitCommitHdr(oldHdr [2]string, data <-chan byte) ([2]string, int, erro
func parseGitCommitMsg(data <-chan byte, l int) (string, error) {
msg := make([]byte, 0, l)
for c := <-data; c != '\x00'; c = <-data {
for {
c, ok := <-data
if !ok {
return string(msg), io.EOF
}
if c == '\x00' {
break
}
msg = append(msg, c)
l--
}
@@ -578,7 +642,7 @@ func parseGitCommit(data <-chan byte) (GitCommit, error) {
var hdr [2]string
hdr, size, err := parseGitCommitHdr(hdr, data)
if err != nil {
return GitCommit{}, nil
return GitCommit{}, err
}
l -= size
@@ -599,14 +663,28 @@ func parseGitCommit(data <-chan byte) (GitCommit, error) {
func parseTreeEntry(data <-chan byte, hashLen int) (GitTreeEntry, error) {
var e GitTreeEntry
for c := <-data; c != ' '; c = <-data {
for {
c, ok := <-data
if !ok {
return e, io.EOF
}
if c == ' ' {
break
}
e.mode = e.mode*8 + int(c-'0')
e.size++
}
e.size++
name := make([]byte, 0, 128)
for c := <-data; c != '\x00'; c = <-data {
for {
c, ok := <-data
if !ok {
return e, io.EOF
}
if c == '\x00' {
break
}
name = append(name, c)
e.size++
}
@@ -617,7 +695,10 @@ func parseTreeEntry(data <-chan byte, hashLen int) (GitTreeEntry, error) {
hash := make([]byte, 0, hashLen*2)
for range hashLen {
c := <-data
c, ok := <-data
if !ok {
return e, io.EOF
}
hash = append(hash, hexBinToAscii[((c&0xF0)>>4)], hexBinToAscii[c&0xF])
}
e.hash = string(hash)
@@ -638,13 +719,16 @@ func parseGitTree(data <-chan byte) (GitTree, error) {
for parsedLen < hdr.size {
entry, err := parseTreeEntry(data, len(hdr.hash)/2)
if err != nil {
return GitTree{}, nil
return GitTree{}, err
}
t.items = append(t.items, entry)
parsedLen += entry.size
}
c := <-data // \0 read
c, ok := <-data // \0 read
if !ok {
return t, io.EOF
}
if c != '\x00' {
return t, fmt.Errorf("Unexpected character during git tree data read")
@@ -665,9 +749,16 @@ func parseGitBlob(data <-chan byte) ([]byte, error) {
d := make([]byte, hdr.size)
for l := 0; l < hdr.size; l++ {
d[l] = <-data
var ok bool
d[l], ok = <-data
if !ok {
return d, io.EOF
}
}
eob, ok := <-data
if !ok {
return d, io.EOF
}
eob := <-data
if eob != '\x00' {
return d, fmt.Errorf("invalid byte read in parseGitBlob")
}
@@ -679,16 +770,25 @@ func (e *GitHandlerImpl) GitParseCommits(cwd string, commitIDs []string) (parsed
var done sync.Mutex
done.Lock()
data_in, data_out := ChanIO{make(chan byte)}, ChanIO{make(chan byte)}
done_signal := make(chan struct{})
var once sync.Once
close_done := func() {
once.Do(func() {
close(done_signal)
})
}
data_in, data_out := ChanIO{make(chan byte), done_signal}, ChanIO{make(chan byte), done_signal}
parsedCommits = make([]GitCommit, 0, len(commitIDs))
go func() {
defer done.Unlock()
defer close_done()
defer close(data_out.ch)
for _, id := range commitIDs {
data_out.Write([]byte(id))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
c, e := parseGitCommit(data_in.ch)
if e != nil {
err = fmt.Errorf("Error parsing git commit: %w", e)
@@ -715,12 +815,14 @@ func (e *GitHandlerImpl) GitParseCommits(cwd string, commitIDs []string) (parsed
LogDebug("command run:", cmd.Args)
if e := cmd.Run(); e != nil {
LogError(e)
close_done()
close(data_in.ch)
close(data_out.ch)
return nil, e
}
done.Lock()
close_done()
close(data_in.ch)
return
}
@@ -729,15 +831,21 @@ func (e *GitHandlerImpl) GitCatFile(cwd, commitId, filename string) (data []byte
var done sync.Mutex
done.Lock()
data_in, data_out := ChanIO{make(chan byte)}, ChanIO{make(chan byte)}
done_signal := make(chan struct{})
var once sync.Once
close_done := func() {
once.Do(func() {
close(done_signal)
})
}
data_in, data_out := ChanIO{make(chan byte), done_signal}, ChanIO{make(chan byte), done_signal}
go func() {
defer done.Unlock()
defer close_done()
defer close(data_out.ch)
data_out.Write([]byte(commitId))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var c GitCommit
c, err = parseGitCommit(data_in.ch)
if err != nil {
@@ -745,11 +853,9 @@ func (e *GitHandlerImpl) GitCatFile(cwd, commitId, filename string) (data []byte
return
}
data_out.Write([]byte(c.Tree))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var tree GitTree
tree, err = parseGitTree(data_in.ch)
if err != nil {
LogError("Error parsing git tree:", err)
return
@@ -759,7 +865,7 @@ func (e *GitHandlerImpl) GitCatFile(cwd, commitId, filename string) (data []byte
if te.isBlob() && te.name == filename {
LogInfo("blob", te.hash)
data_out.Write([]byte(te.hash))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
data, err = parseGitBlob(data_in.ch)
return
}
@@ -784,11 +890,13 @@ func (e *GitHandlerImpl) GitCatFile(cwd, commitId, filename string) (data []byte
LogDebug("command run:", cmd.Args)
if e := cmd.Run(); e != nil {
LogError(e)
close_done()
close(data_in.ch)
close(data_out.ch)
return nil, e
}
done.Lock()
close_done()
close(data_in.ch)
return
}
@@ -798,16 +906,24 @@ func (e *GitHandlerImpl) GitDirectoryList(gitPath, commitId string) (directoryLi
directoryList = make(map[string]string)
done.Lock()
data_in, data_out := ChanIO{make(chan byte)}, ChanIO{make(chan byte)}
done_signal := make(chan struct{})
var once sync.Once
close_done := func() {
once.Do(func() {
close(done_signal)
})
}
data_in, data_out := ChanIO{make(chan byte), done_signal}, ChanIO{make(chan byte), done_signal}
LogDebug("Getting directory for:", commitId)
go func() {
defer done.Unlock()
defer close_done()
defer close(data_out.ch)
data_out.Write([]byte(commitId))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var c GitCommit
c, err = parseGitCommit(data_in.ch)
if err != nil {
@@ -823,7 +939,7 @@ func (e *GitHandlerImpl) GitDirectoryList(gitPath, commitId string) (directoryLi
delete(trees, p)
data_out.Write([]byte(tree))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var tree GitTree
tree, err = parseGitTree(data_in.ch)
@@ -857,12 +973,14 @@ func (e *GitHandlerImpl) GitDirectoryList(gitPath, commitId string) (directoryLi
LogDebug("command run:", cmd.Args)
if e := cmd.Run(); e != nil {
LogError(e)
close_done()
close(data_in.ch)
close(data_out.ch)
return directoryList, e
}
done.Lock()
close_done()
close(data_in.ch)
return directoryList, err
}
@@ -872,7 +990,14 @@ func (e *GitHandlerImpl) GitDirectoryContentList(gitPath, commitId string) (dire
directoryList = make(map[string]string)
done.Lock()
data_in, data_out := ChanIO{make(chan byte)}, ChanIO{make(chan byte)}
done_signal := make(chan struct{})
var once sync.Once
close_done := func() {
once.Do(func() {
close(done_signal)
})
}
data_in, data_out := ChanIO{make(chan byte), done_signal}, ChanIO{make(chan byte), done_signal}
LogDebug("Getting directory content for:", commitId)
@@ -881,7 +1006,7 @@ func (e *GitHandlerImpl) GitDirectoryContentList(gitPath, commitId string) (dire
defer close(data_out.ch)
data_out.Write([]byte(commitId))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var c GitCommit
c, err = parseGitCommit(data_in.ch)
if err != nil {
@@ -897,7 +1022,7 @@ func (e *GitHandlerImpl) GitDirectoryContentList(gitPath, commitId string) (dire
delete(trees, p)
data_out.Write([]byte(tree))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var tree GitTree
tree, err = parseGitTree(data_in.ch)
@@ -933,12 +1058,14 @@ func (e *GitHandlerImpl) GitDirectoryContentList(gitPath, commitId string) (dire
LogDebug("command run:", cmd.Args)
if e := cmd.Run(); e != nil {
LogError(e)
close_done()
close(data_in.ch)
close(data_out.ch)
return directoryList, e
}
done.Lock()
close_done()
close(data_in.ch)
return directoryList, err
}
@@ -948,16 +1075,24 @@ func (e *GitHandlerImpl) GitSubmoduleList(gitPath, commitId string) (submoduleLi
submoduleList = make(map[string]string)
done.Lock()
data_in, data_out := ChanIO{make(chan byte)}, ChanIO{make(chan byte)}
done_signal := make(chan struct{})
var once sync.Once
close_done := func() {
once.Do(func() {
close(done_signal)
})
}
data_in, data_out := ChanIO{make(chan byte), done_signal}, ChanIO{make(chan byte), done_signal}
LogDebug("Getting submodules for:", commitId)
go func() {
defer done.Unlock()
defer close_done()
defer close(data_out.ch)
data_out.Write([]byte(commitId))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var c GitCommit
c, err = parseGitCommit(data_in.ch)
if err != nil {
@@ -973,7 +1108,7 @@ func (e *GitHandlerImpl) GitSubmoduleList(gitPath, commitId string) (submoduleLi
delete(trees, p)
data_out.Write([]byte(tree))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
var tree GitTree
tree, err = parseGitTree(data_in.ch)
@@ -1010,17 +1145,26 @@ func (e *GitHandlerImpl) GitSubmoduleList(gitPath, commitId string) (submoduleLi
LogDebug("command run:", cmd.Args)
if e := cmd.Run(); e != nil {
LogError(e)
close_done()
close(data_in.ch)
close(data_out.ch)
return submoduleList, e
}
done.Lock()
close_done()
close(data_in.ch)
return submoduleList, err
}
func (e *GitHandlerImpl) GitSubmoduleCommitId(cwd, packageName, commitId string) (subCommitId string, valid bool) {
data_in, data_out := ChanIO{make(chan byte)}, ChanIO{make(chan byte)}
done_signal := make(chan struct{})
var once sync.Once
close_done := func() {
once.Do(func() {
close(done_signal)
})
}
data_in, data_out := ChanIO{make(chan byte), done_signal}, ChanIO{make(chan byte), done_signal}
var wg sync.WaitGroup
wg.Add(1)
@@ -1036,17 +1180,18 @@ func (e *GitHandlerImpl) GitSubmoduleCommitId(cwd, packageName, commitId string)
}()
defer wg.Done()
defer close_done()
defer close(data_out.ch)
data_out.Write([]byte(commitId))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
c, err := parseGitCommit(data_in.ch)
if err != nil {
LogError("Error parsing git commit:", err)
panic(err)
}
data_out.Write([]byte(c.Tree))
data_out.ch <- '\x00'
data_out.Write([]byte{0})
tree, err := parseGitTree(data_in.ch)
if err != nil {
@@ -1078,12 +1223,14 @@ func (e *GitHandlerImpl) GitSubmoduleCommitId(cwd, packageName, commitId string)
LogDebug("command run:", cmd.Args)
if e := cmd.Run(); e != nil {
LogError(e)
close_done()
close(data_in.ch)
close(data_out.ch)
return subCommitId, false
}
wg.Wait()
close_done()
close(data_in.ch)
return subCommitId, len(subCommitId) > 0
}

View File

@@ -28,6 +28,7 @@ import (
"slices"
"strings"
"testing"
"time"
)
func TestGitClone(t *testing.T) {
@@ -717,3 +718,44 @@ func TestGitDirectoryListRepro(t *testing.T) {
t.Errorf("Expected 'subdir' in directory list, got %v", dirs)
}
}
func TestGitDeadlockFix(t *testing.T) {
gitDir := t.TempDir()
testDir, _ := os.Getwd()
cmd := exec.Command("/usr/bin/bash", path.Join(testDir, "tsetup.sh"))
cmd.Dir = gitDir
_, err := cmd.CombinedOutput()
gh, err := AllocateGitWorkTree(gitDir, "Test", "test@example.com")
if err != nil {
t.Fatal(err)
}
h, err := gh.ReadExistingPath(".")
if err != nil {
t.Fatal(err)
}
defer h.Close()
// Use a blob ID to trigger error in GitParseCommits
// This ensures that the function returns error immediately and doesn't deadlock
blobId := "81aba862107f1e2f5312e165453955485f424612f313d6c2fb1b31fef9f82a14"
done := make(chan error)
go func() {
_, err := h.GitParseCommits("", []string{blobId})
done <- err
}()
select {
case err := <-done:
if err == nil {
t.Error("Expected error from GitParseCommits with blob ID, got nil")
} else {
// This is expected
t.Logf("Got expected error: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("GitParseCommits deadlocked! Fix is NOT working.")
}
}

View File

@@ -327,6 +327,7 @@ func main() {
interval := flag.Int64("interval", 10, "Notification polling interval in minutes (min 1 min)")
configFile := flag.String("config", "", "PrjGit listing config file")
logging := flag.String("logging", "info", "Logging level: [none, error, info, debug]")
exitOnConfigError := flag.Bool("exit-on-config-error", false, "Exit if any repository in configuration cannot be resolved")
flag.BoolVar(&common.IsDryRun, "dry", false, "Dry run, no effect. For debugging")
flag.Parse()
@@ -382,8 +383,10 @@ func main() {
giteaTransport := common.AllocateGiteaTransport(*giteaUrl)
configs, err := common.ResolveWorkflowConfigs(giteaTransport, configData)
if err != nil {
common.LogError("Cannot parse workflow configs:", err)
return
common.LogError("Failed to resolve some configuration repositories:", err)
if *exitOnConfigError {
return
}
}
reviewer, err := giteaTransport.GetCurrentUser()

View File

@@ -503,7 +503,10 @@ func updateConfiguration(configFilename string, orgs *[]string) {
os.Exit(4)
}
configs, _ := common.ResolveWorkflowConfigs(gitea, configFile)
configs, err := common.ResolveWorkflowConfigs(gitea, configFile)
if err != nil {
common.LogError("Failed to resolve some configuration repositories:", err)
}
configuredRepos = make(map[string][]*common.AutogitConfig)
*orgs = make([]string, 0, 1)
for _, c := range configs {

View File

@@ -58,6 +58,7 @@ func main() {
checkOnStart := flag.Bool("check-on-start", common.GetEnvOverrideBool(os.Getenv("AUTOGITS_CHECK_ON_START"), false), "Check all repositories for consistency on start, without delays")
checkIntervalHours := flag.Float64("check-interval", 5, "Check interval (+-random delay) for repositories for consitency, in hours")
flag.BoolVar(&ListPROnly, "list-prs-only", false, "Only lists PRs without acting on them")
exitOnConfigError := flag.Bool("exit-on-config-error", false, "Exit if any repository in configuration cannot be resolved")
flag.Int64Var(&PRID, "id", -1, "Process only the specific ID and ignore the rest. Use for debugging")
basePath := flag.String("repo-path", common.GetEnvOverrideString(os.Getenv("AUTOGITS_REPO_PATH"), ""), "Repository path. Default is temporary directory")
pr := flag.String("only-pr", "", "Only specific PR to process. For debugging")
@@ -97,8 +98,10 @@ func main() {
configs, err := common.ResolveWorkflowConfigs(Gitea, config)
if err != nil {
common.LogError("Cannot resolve config files:", err)
return
common.LogError("Failed to resolve some configuration repositories:", err)
if *exitOnConfigError {
return
}
}
for _, c := range configs {