Compare commits

..

1 Commits

Author SHA1 Message Date
Stephan Kulow
716db10adf Scan old imports 2022-11-24 10:24:14 +01:00
17 changed files with 204 additions and 1701 deletions

View File

@ -9,5 +9,5 @@ test:
update-packages: update-packages:
f=$$(mktemp) ;\ f=$$(mktemp) ;\
osc api /source/openSUSE:Factory?view=info | grep -v lsrcmd5 | grep srcmd5= | sed -e 's,.*package=",,; s,".*,,' | grep -v : > $$f ;\ osc api /source/openSUSE:Factory?view=info | grep -v lsrcmd5 | grep srcmd5= | sed -e 's,.*package=",,; s,".*,,' | grep -v : > $$f ;\
echo _project >> $$f;\ echo _project >> $$f ;\
mv $$f packages mv $$f packages

View File

@ -1,18 +1,5 @@
Installation sudo zypper in python3-psycopg2
------------ sudo su - postgres
# `createdb -O <LOCAL_USER> imported_git`
sudo zypper in python3-psycopg
sudo su - postgres
createdb -O <LOCAL_USER> imported_git`
To reset the database, drop table scheme To reset the database, drop table scheme
Gitea parameters
----------------
* `GITEA_HOST` - default: src.opensuse.org
* `GITEA_USER` - Used to generate SSH links for push. Default: gitea
* `GITEA_ORG` - target organization to push to
* `GITEA_DEFAULT_BRANCH` - default branch

View File

@ -7,6 +7,8 @@ import sys
import osc.core import osc.core
from lib.db import DB
from lib.db_revision import DBRevision
from lib.git_exporter import GitExporter from lib.git_exporter import GitExporter
from lib.importer import Importer from lib.importer import Importer
from lib.test_exporter import TestExporter from lib.test_exporter import TestExporter
@ -42,8 +44,8 @@ PROJECTS = [
] ]
def export_package(project, package, repodir, cachedir, gc): def export_package(package, repodir, cachedir, gc):
exporter = GitExporter(URL_OBS, project, package, repodir, cachedir) exporter = GitExporter(URL_OBS, "openSUSE:Factory", package, repodir, cachedir)
exporter.set_gc_interval(gc) exporter.set_gc_interval(gc)
exporter.export_as_git() exporter.export_as_git()
@ -51,12 +53,6 @@ def export_package(project, package, repodir, cachedir, gc):
def main(): def main():
parser = argparse.ArgumentParser(description="OBS history importer into git") parser = argparse.ArgumentParser(description="OBS history importer into git")
parser.add_argument("packages", help="OBS package names", nargs="*") parser.add_argument("packages", help="OBS package names", nargs="*")
parser.add_argument(
"-p",
"--project",
default="openSUSE:Factory",
help="Project to import/export, default is openSUSE:Factory",
)
parser.add_argument( parser.add_argument(
"-r", "-r",
"--repodir", "--repodir",
@ -106,6 +102,56 @@ def main():
requests_log.setLevel(logging.DEBUG) requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True requests_log.propagate = True
def check_old_package(db: DB, dir: pathlib.Path):
md5file = dir / "MD5SUMS"
print(md5file)
valid_revisions = None
with open(md5file, "rb") as f:
for line in f.readlines():
try:
md5, file = line.decode("utf-8").strip().split(" ")
except UnicodeDecodeError:
logging.error(f"Corrupt MD5 file: {md5file}")
return
if file == "ready":
continue
if len(md5) != 32:
logging.error(f"Corrupt MD5 file: {md5file}")
return
with db.cursor() as cur:
cur.execute(
"SELECT revision_id FROM files WHERE md5=%s AND name=%s",
(md5, file),
)
nrevs = set([row[0] for row in cur.fetchall()])
if valid_revisions is None:
valid_revisions = nrevs
else:
valid_revisions = valid_revisions & nrevs
if not valid_revisions:
break
with db.cursor() as cur:
cur.execute(
"SELECT * FROM revisions WHERE id = ANY(%s) AND project=%s",
(list(valid_revisions), "openSUSE:Factory"),
)
for row in cur.fetchall():
r = DBRevision(db, row)
print("Valid", r, r.files_hash)
return True
if False:
import os
db = DB()
basedir = pathlib.Path(
f"/mounts/work/SAVE/oldpackages/stable/{args.packages[0]}"
)
for subdir in sorted(os.listdir(basedir)):
if check_old_package(db, basedir / subdir):
break
if args.export: if args.export:
if len(args.packages) != 1: if len(args.packages) != 1:
print("Can only export one package") print("Can only export one package")
@ -116,13 +162,10 @@ def main():
if not args.cachedir: if not args.cachedir:
args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser() args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser()
importer = Importer(URL_OBS, args.project, args.packages) importer = Importer(URL_OBS, "openSUSE:Factory", args.packages)
importer.import_into_db() importer.import_into_db()
for package in args.packages: for package in args.packages:
if not importer.package_with_scmsync(package): export_package(package, args.repodir, args.cachedir, args.gc)
export_package(args.project, package, args.repodir, args.cachedir, args.gc)
else:
logging.debug(f"{args.project}/{package} has scmsync links - skipping export")
if __name__ == "__main__": if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,8 @@ def config(filename="database.ini", section="production"):
for param in params: for param in params:
db[param[0]] = param[1] db[param[0]] = param[1]
else: else:
raise Exception(f"Section {section} not found in the {filename} file") raise Exception(
"Section {0} not found in the {1} file".format(section, filename)
)
return db return db

View File

@ -1,6 +1,7 @@
import logging import logging
import psycopg import psycopg2
from psycopg2.extras import LoggingConnection
from lib.config import config from lib.config import config
@ -16,20 +17,22 @@ class DB:
# read the connection parameters # read the connection parameters
params = config(section=self.config_section) params = config(section=self.config_section)
# connect to the PostgreSQL server # connect to the PostgreSQL server
self.conn = psycopg.connect(conninfo=f"dbname={params['database']}") self.conn = psycopg2.connect(connection_factory=LoggingConnection, **params)
logging.getLogger("psycopg.pool").setLevel(logging.INFO) logger = logging.getLogger(__name__)
self.conn.initialize(logger)
except (Exception, psycopg.DatabaseError) as error: except (Exception, psycopg2.DatabaseError) as error:
print(error) print(error)
raise error raise error
def schema_version(self): def schema_version(self):
# create a cursor # create a cursor
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
# execute a statement # execute a statement
try: try:
cur.execute("SELECT MAX(version) from scheme") cur.execute("SELECT MAX(version) from scheme")
except psycopg.errors.UndefinedTable: except psycopg2.errors.UndefinedTable as error:
cur.close() cur.close()
self.close() self.close()
self.connect() self.connect()
@ -143,9 +146,9 @@ class DB:
) )
schemes[10] = ( schemes[10] = (
"ALTER TABLE revisions ADD COLUMN request_id INTEGER", "ALTER TABLE revisions ADD COLUMN request_id INTEGER",
"""ALTER TABLE revisions """ALTER TABLE revisions
ADD CONSTRAINT request_id_foreign_key ADD CONSTRAINT request_id_foreign_key
FOREIGN KEY (request_id) FOREIGN KEY (request_id)
REFERENCES requests (id)""", REFERENCES requests (id)""",
"UPDATE scheme SET version=10", "UPDATE scheme SET version=10",
) )
@ -270,7 +273,7 @@ class DB:
cur.execute(command) cur.execute(command)
# commit the changes # commit the changes
self.conn.commit() self.conn.commit()
except (Exception, psycopg.DatabaseError) as error: except (Exception, psycopg2.DatabaseError) as error:
print(error) print(error)
self.close() self.close()
raise error raise error

View File

@ -2,6 +2,7 @@ from __future__ import annotations
from hashlib import md5 from hashlib import md5
from pathlib import Path from pathlib import Path
from typing import Optional
from lib.db import DB from lib.db import DB
from lib.obs_revision import OBSRevision from lib.obs_revision import OBSRevision
@ -205,7 +206,7 @@ class DBRevision:
): ):
continue continue
cur.execute( cur.execute(
"""INSERT INTO files (name, md5, size, mtime, revision_id) """INSERT INTO files (name, md5, size, mtime, revision_id)
VALUES (%s,%s,%s,%s,%s)""", VALUES (%s,%s,%s,%s,%s)""",
( (
entry.get("name"), entry.get("name"),
@ -254,7 +255,7 @@ class DBRevision:
self._files.sort(key=lambda x: x["name"]) self._files.sort(key=lambda x: x["name"])
return self._files return self._files
def calc_delta(self, current_rev: DBRevision | None): def calc_delta(self, current_rev: Optional[DBRevision]):
"""Calculate the list of files to download and to delete. """Calculate the list of files to download and to delete.
Param current_rev is the revision that's currently checked out. Param current_rev is the revision that's currently checked out.
If it's None, the repository is empty. If it's None, the repository is empty.

View File

@ -4,6 +4,7 @@ import os
import pathlib import pathlib
import subprocess import subprocess
import pygit2
import requests import requests
from lib.binary import BINARY from lib.binary import BINARY
@ -19,6 +20,11 @@ class Git:
self.committer = committer self.committer = committer
self.committer_email = committer_email self.committer_email = committer_email
self.repo = None
def is_open(self):
return self.repo is not None
def exists(self): def exists(self):
"""Check if the path is a valid git repository""" """Check if the path is a valid git repository"""
return (self.path / ".git").exists() return (self.path / ".git").exists()
@ -28,70 +34,36 @@ class Git:
self.path.mkdir(parents=True, exist_ok=True) self.path.mkdir(parents=True, exist_ok=True)
self.open() self.open()
def git_run(self, args, **kwargs):
"""Run a git command"""
if "env" in kwargs:
envs = kwargs["env"].copy()
del kwargs["env"]
else:
envs = os.environ.copy()
envs["GIT_LFS_SKIP_SMUDGE"] = "1"
envs["GIT_CONFIG_GLOBAL"] = "/dev/null"
return subprocess.run(
["git"] + args,
cwd=self.path,
check=True,
env=envs,
**kwargs,
)
def open(self): def open(self):
if not self.exists(): # Convert the path to string, to avoid some limitations in
self.git_run(["init", "--object-format=sha256", "-b", "factory"]) # older pygit2
self.git_run(["config", "lfs.allowincompletepush", "true"]) self.repo = pygit2.init_repository(str(self.path))
def is_dirty(self): def is_dirty(self):
"""Check if there is something to commit""" """Check if there is something to commit"""
status_str = self.git_run( assert self.is_open()
["status", "--porcelain=2"],
stdout=subprocess.PIPE, return self.repo.status()
).stdout.decode("utf-8")
return len(list(filter(None, status_str.split("\n")))) > 0
def branches(self): def branches(self):
br = ( return list(self.repo.branches)
self.git_run(
["for-each-ref", "--format=%(refname:short)", "refs/heads/"],
stdout=subprocess.PIPE,
)
.stdout.decode("utf-8")
.split()
)
if len(br) == 0:
br.append("factory") # unborn branch?
return br
def branch(self, branch, commit="HEAD"): def branch(self, branch, commit=None):
commit = ( if not commit:
self.git_run( commit = self.repo.head
["rev-parse", "--verify", "--end-of-options", commit + "^{commit}"], else:
stdout=subprocess.PIPE, commit = self.repo.get(commit)
) self.repo.branches.local.create(branch, commit)
.stdout.decode("utf-8")
.strip()
)
return self.git_run(["branch", branch, commit])
def checkout(self, branch): def checkout(self, branch):
"""Checkout into the branch HEAD""" """Checkout into the branch HEAD"""
new_branch = False new_branch = False
ref = f"refs/heads/{branch}"
if branch not in self.branches(): if branch not in self.branches():
self.git_run(["switch", "-q", "--orphan", branch]) self.repo.references["HEAD"].set_target(ref)
new_branch = True new_branch = True
else: else:
ref = f"refs/heads/{branch}" self.repo.checkout(ref)
if (self.path / ".git" / ref).exists():
self.git_run(["switch", "--no-guess", "-q", branch])
return new_branch return new_branch
def commit( def commit(
@ -115,79 +87,51 @@ class Git:
committer_time = committer_time if committer_time else user_time committer_time = committer_time if committer_time else user_time
if self.is_dirty(): if self.is_dirty():
self.git_run(["add", "--all", "."]) self.repo.index.add_all()
tree_id = ( self.repo.index.write()
self.git_run(["write-tree"], stdout=subprocess.PIPE) author = pygit2.Signature(user, user_email, int(user_time.timestamp()))
.stdout.decode("utf-8") committer = pygit2.Signature(
.strip() committer, committer_email, int(committer_time.timestamp())
) )
parent_array = [] tree = self.repo.index.write_tree()
if isinstance(parents, list): return self.repo.create_commit(
for parent in filter(None, parents): "HEAD", author, committer, message, tree, parents
parent_array = parent_array + ["-p", parent]
elif isinstance(parents, str):
parent_array = ["-p", parents]
commit_id = (
self.git_run(
["commit-tree"] + parent_array + [tree_id],
env={
"GIT_AUTHOR_NAME": user,
"GIT_AUTHOR_EMAIL": user_email,
"GIT_AUTHOR_DATE": f"{int(user_time.timestamp())} +0000",
"GIT_COMMITTER_NAME": committer,
"GIT_COMMITTER_EMAIL": committer_email,
"GIT_COMMITTER_DATE": f"{int(committer_time.timestamp())} +0000",
},
input=message.encode("utf-8"),
stdout=subprocess.PIPE,
)
.stdout.decode("utf-8")
.rstrip()
) )
self.git_run(["reset", "--soft", commit_id])
return commit_id
def branch_head(self, branch="HEAD"): def last_commit(self):
return ( try:
self.git_run( return self.repo.head.target
["rev-parse", "--verify", "--end-of-options", branch], except:
stdout=subprocess.PIPE, return None
)
.stdout.decode("utf-8") def branch_head(self, branch):
.strip() return self.repo.references["refs/heads/" + branch].target
)
def set_branch_head(self, branch, commit): def set_branch_head(self, branch, commit):
return self.git_run(["update-ref", f"refs/heads/{branch}", commit]) self.repo.references["refs/heads/" + branch].set_target(commit)
def gc(self): def gc(self):
logging.debug(f"Garbage recollect and repackage {self.path}") logging.debug(f"Garbage recollect and repackage {self.path}")
self.git_run( subprocess.run(
["gc", "--auto"], ["git", "gc", "--auto"],
cwd=self.path,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
) )
# def clean(self): def clean(self):
# for path, _ in self.repo.status().items(): for path, _ in self.repo.status().items():
# logging.debug(f"Cleaning {path}") logging.debug(f"Cleaning {path}")
# try: try:
# (self.path / path).unlink() (self.path / path).unlink()
# self.repo.index.remove(path) self.repo.index.remove(path)
# except Exception as e: except Exception as e:
# logging.warning(f"Error removing file {path}: {e}") logging.warning(f"Error removing file {path}: {e}")
def add(self, filename): def add(self, filename):
self.git_run(["add", ":(literal)" + str(filename)]) self.repo.index.add(filename)
def add_default_gitignore(self):
if not (self.path / ".gitignore").exists():
with (self.path / ".gitignore").open("w") as f:
f.write(".osc\n")
self.add(".gitignore")
def add_default_lfs_gitattributes(self, force=False): def add_default_lfs_gitattributes(self, force=False):
if not (self.path / ".gitattributes").exists() or force: if not (self.path / ".gitattributes").exists() or force:
@ -241,9 +185,9 @@ class Git:
return any(fnmatch.fnmatch(filename, line) for line in patterns) return any(fnmatch.fnmatch(filename, line) for line in patterns)
def remove(self, file: pathlib.Path): def remove(self, file: pathlib.Path):
self.git_run( self.repo.index.remove(file.name)
["rm", "-q", "-f", "--ignore-unmatch", ":(literal)" + file.name], (self.path / file).unlink()
)
patterns = self.get_specific_lfs_gitattributes() patterns = self.get_specific_lfs_gitattributes()
if file.name in patterns: if file.name in patterns:
patterns.remove(file.name) patterns.remove(file.name)
@ -252,27 +196,15 @@ class Git:
def add_gitea_remote(self, package): def add_gitea_remote(self, package):
repo_name = package.replace("+", "_") repo_name = package.replace("+", "_")
org_name = "rpm" org_name = "rpm"
gitea_user = "gitea"
gitea_host = "src.opensuse.org"
default_branch = "factory"
if os.getenv("GITEA_HOST"):
gitea_host = getenv("GITEA_HOST")
if os.getenv("GITEA_USER"):
gitea_user = getenv("GITEA_USER")
if os.getenv("GITEA_ORG"):
org_name = getenv("GITEA_ORG")
if os.getenv("GITEA_DEFAULT_BRANCH"):
default_branch = getenv("GITEA_DEFAULT_BRANCH")
if not os.getenv("GITEA_TOKEN"): if not os.getenv("GITEA_TOKEN"):
logging.warning("Not adding a remote due to missing $GITEA_TOKEN") logging.warning("Not adding a remote due to missing $GITEA_TOKEN")
return return
url = f"https://{gitea_host}/api/v1/org/{org_name}/repos" url = f"https://gitea.opensuse.org/api/v1/org/{org_name}/repos"
response = requests.post( response = requests.post(
url, url,
data={"name": repo_name, "object_format_name": "sha256", "default_branch": default_branch}, data={"name": repo_name},
headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"}, headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"},
timeout=10, timeout=10,
) )
@ -280,21 +212,16 @@ class Git:
# 201 Created # 201 Created
if response.status_code not in (201, 409): if response.status_code not in (201, 409):
print(response.data) print(response.data)
url = f"{gitea_user}@{gitea_host}:{org_name}/{repo_name}.git" url = f"gitea@gitea.opensuse.org:{org_name}/{repo_name}.git"
self.git_run( self.repo.remotes.create("origin", url)
["remote", "add", "origin", url],
)
def push(self, force=False): def push(self):
if "origin" not in self.git_run( remo = self.repo.remotes["origin"]
["remote"],
stdout=subprocess.PIPE,
).stdout.decode("utf-8"):
logging.warning("Not pushing to remote because no 'origin' configured")
return
cmd = ["push"] keypair = pygit2.KeypairFromAgent("gitea")
if force: callbacks = pygit2.RemoteCallbacks(credentials=keypair)
cmd.append("-f")
cmd += ["origin", "--all"] refspecs = ["refs/heads/factory"]
self.git_run(cmd) if "refs/heads/devel" in self.repo.references:
refspecs.append("refs/heads/devel")
remo.push(refspecs, callbacks=callbacks)

View File

@ -29,7 +29,7 @@ class GitExporter:
self.git.open() self.git.open()
else: else:
self.git.create() self.git.create()
# self.git.add_gitea_remote(package) self.git.add_gitea_remote(package)
self.state_file = os.path.join(self.git.path, ".git", "_flat_state.yaml") self.state_file = os.path.join(self.git.path, ".git", "_flat_state.yaml")
self.gc_interval = 200 self.gc_interval = 200
self.cachedir = cachedir self.cachedir = cachedir
@ -40,9 +40,9 @@ class GitExporter:
def check_repo_state(self, flats, branch_state): def check_repo_state(self, flats, branch_state):
state_data = dict() state_data = dict()
if os.path.exists(self.state_file): if os.path.exists(self.state_file):
with open(self.state_file) as f: with open(self.state_file, "r") as f:
state_data = yaml.safe_load(f) state_data = yaml.safe_load(f)
if not isinstance(state_data, dict): if type(state_data) != dict:
state_data = {} state_data = {}
left_to_commit = [] left_to_commit = []
for flat in reversed(flats): for flat in reversed(flats):
@ -86,12 +86,7 @@ class GitExporter:
logging.debug(f"Committing {flat}") logging.debug(f"Committing {flat}")
self.commit_flat(flat, branch_state) self.commit_flat(flat, branch_state)
# make sure that we create devel branch self.git.push()
if not branch_state["devel"]:
logging.debug("force creating devel")
self.git.set_branch_head("devel", self.git.branch_head("factory"))
self.git.push(force=True)
def run_gc(self): def run_gc(self):
self.gc_cnt = self.gc_interval self.gc_cnt = self.gc_interval
@ -155,7 +150,6 @@ class GitExporter:
# create file if not existant # create file if not existant
self.git.add_default_lfs_gitattributes(force=False) self.git.add_default_lfs_gitattributes(force=False)
self.git.add_default_gitignore()
to_download, to_delete = flat.commit.calc_delta(branch_state[flat.branch]) to_download, to_delete = flat.commit.calc_delta(branch_state[flat.branch])
for file in to_delete: for file in to_delete:

View File

@ -1,5 +1,5 @@
import concurrent.futures
import logging import logging
import pathlib
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from lib.db import DB from lib.db import DB
@ -26,15 +26,11 @@ class Importer:
# Import multiple Factory packages into the database # Import multiple Factory packages into the database
self.packages = packages self.packages = packages
self.project = project self.project = project
self.scmsync_cache = dict()
self.packages_with_scmsync = set()
self.db = DB() self.db = DB()
self.obs = OBS(api_url) self.obs = OBS(api_url)
assert not self.has_scmsync(project) assert project == "openSUSE:Factory"
self.refreshed_packages = set() self.refreshed_packages = set()
self.gone_packages_set = None
def import_request(self, number): def import_request(self, number):
self.obs.request(number).import_into_db(self.db) self.obs.request(number).import_into_db(self.db)
@ -109,7 +105,7 @@ class Importer:
with self.db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"""SELECT * FROM revisions WHERE id IN """SELECT * FROM revisions WHERE id IN
(SELECT revision_id from linked_revs WHERE linked_id=%s) (SELECT revision_id from linked_revs WHERE linked_id=%s)
AND commit_time <= %s ORDER BY commit_time""", AND commit_time <= %s ORDER BY commit_time""",
(prev.dbid, rev.commit_time), (prev.dbid, rev.commit_time),
) )
@ -142,7 +138,7 @@ class Importer:
fake_rev = linked.rev + rev.rev / 1000.0 fake_rev = linked.rev + rev.rev / 1000.0
comment = f"Updating link to change in {rev.project}/{rev.package} revision {int(rev.rev)}" comment = f"Updating link to change in {rev.project}/{rev.package} revision {int(rev.rev)}"
cur.execute( cur.execute(
"""INSERT INTO revisions (project,package,rev,unexpanded_srcmd5, """INSERT INTO revisions (project,package,rev,unexpanded_srcmd5,
commit_time, userid, comment, api_url) VALUES(%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id""", commit_time, userid, comment, api_url) VALUES(%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id""",
( (
linked.project, linked.project,
@ -165,12 +161,10 @@ class Importer:
(rev.dbid, linked.dbid), (rev.dbid, linked.dbid),
) )
def revisions_without_files(self, package): def revisions_without_files(self):
logging.debug(f"revisions_without_files({package})")
with self.db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"SELECT * FROM revisions WHERE package=%s AND broken=FALSE AND expanded_srcmd5 IS NULL", "SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
(package,),
) )
return [DBRevision(self.db, row) for row in cur.fetchall()] return [DBRevision(self.db, row) for row in cur.fetchall()]
@ -184,11 +178,11 @@ class Importer:
linked_rev = cur.fetchone() linked_rev = cur.fetchone()
if linked_rev: if linked_rev:
linked_rev = linked_rev[0] linked_rev = linked_rev[0]
obs_dir_list = self.obs.list( list = self.obs.list(
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
) )
if obs_dir_list: if list:
rev.import_dir_list(obs_dir_list) rev.import_dir_list(list)
md5 = rev.calculate_files_hash() md5 = rev.calculate_files_hash()
with self.db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
@ -202,47 +196,53 @@ class Importer:
self.find_linked_revs() self.find_linked_revs()
self.find_fake_revisions() self.find_fake_revisions()
for package in self.packages: with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
for rev in self.revisions_without_files(package): fs = [
print(f"rev {rev} is without files") executor.submit(import_rev, self, rev)
self.import_rev(rev) for rev in self.revisions_without_files()
]
concurrent.futures.wait(fs)
def refresh_package(self, project, package): def refresh_package(self, project, package):
key = f"{project}/{package}" key = f"{project}/{package}"
if key in self.refreshed_packages: if key in self.refreshed_packages:
# refreshing once is good enough # refreshing once is good enough
return return
if self.package_gone(key):
return
logging.debug(f"Refresh {project}/{package}") logging.debug(f"Refresh {project}/{package}")
self.refreshed_packages.add(key) self.refreshed_packages.add(key)
if self.has_scmsync(project) or self.has_scmsync(key):
self.packages_with_scmsync.add(package)
logging.debug(f"{project}/{package} already in Git - skipping")
return
self.update_db_package(project, package) self.update_db_package(project, package)
self.fetch_all_linked_packages(project, package) self.fetch_all_linked_packages(project, package)
def import_into_db(self): def import_into_db(self):
for package in self.packages:
refresh_package(self, self.project, package)
self.db.conn.commit() with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
fs = [
executor.submit(refresh_package, self, self.project, package)
for package in self.packages
]
concurrent.futures.wait(fs)
for number in DBRevision.requests_to_fetch(self.db): self.db.conn.commit()
self.import_request(number)
self.db.conn.commit() fs = [
executor.submit(import_request, self, number)
for number in DBRevision.requests_to_fetch(self.db)
]
concurrent.futures.wait(fs)
with self.db.cursor() as cur: self.db.conn.commit()
cur.execute(
"""SELECT DISTINCT source_project,source_package FROM requests
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
(self.project, self.packages),
)
for project, package in cur.fetchall():
self.refresh_package(project, package)
with self.db.cursor() as cur:
cur.execute(
"""SELECT DISTINCT source_project,source_package FROM requests
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
(self.project, self.packages),
)
fs = [
executor.submit(refresh_package, self, project, package)
for project, package in cur.fetchall()
]
concurrent.futures.wait(fs)
self.db.conn.commit() self.db.conn.commit()
missing_users = User.missing_users(self.db) missing_users = User.missing_users(self.db)
@ -254,26 +254,3 @@ class Importer:
self.fill_file_lists() self.fill_file_lists()
self.db.conn.commit() self.db.conn.commit()
def package_gone(self, key):
if not self.gone_packages_set:
self.gone_packages_set = set()
with open(pathlib.Path(__file__).parent.parent / "gone-packages.txt") as f:
for line in f.readlines():
self.gone_packages_set.add(line.strip())
return key in self.gone_packages_set
def has_scmsync(self, key):
if key in self.scmsync_cache:
return self.scmsync_cache[key]
root = self.obs._meta(key)
scmsync_exists = False
if root is not None:
scmsync_exists = root.find('scmsync') is not None
self.scmsync_cache[key] = scmsync_exists
return scmsync_exists
def package_with_scmsync(self, package):
return package in self.packages_with_scmsync

View File

@ -68,7 +68,7 @@ class LFSOid:
row = cur.fetchone() row = cur.fetchone()
lfs_oid_id = row[0] lfs_oid_id = row[0]
cur.execute( cur.execute(
"""INSERT INTO lfs_oid_in_package (package,filename,lfs_oid_id) """INSERT INTO lfs_oid_in_package (package,filename,lfs_oid_id)
VALUES (%s,%s,%s)""", VALUES (%s,%s,%s)""",
(package, filename, lfs_oid_id), (package, filename, lfs_oid_id),
) )
@ -83,8 +83,7 @@ class LFSOid:
self.register() self.register()
def check(self): def check(self):
return True url = f"http://gitea.opensuse.org:9999/check/{self.sha256}/{self.size}"
url = f"http://localhost:9999/check/{self.sha256}/{self.size}"
response = requests.get( response = requests.get(
url, url,
timeout=10, timeout=10,
@ -128,13 +127,12 @@ class LFSOid:
"size": self.size, "size": self.size,
} }
url = "http://localhost:9999/register" url = "http://gitea.opensuse.org:9999/register"
response = requests.post( response = requests.post(
url, url,
json=data, json=data,
timeout=10, timeout=10,
) )
response.raise_for_status()
logging.info(f"Register LFS returned {response.status_code}") logging.info(f"Register LFS returned {response.status_code}")
@ -169,7 +167,7 @@ if __name__ == "__main__":
cur.execute( cur.execute(
""" """
CREATE TEMPORARY TABLE lfs_oid_in_revision ( CREATE TEMPORARY TABLE lfs_oid_in_revision (
revision_id INTEGER, revision_id INTEGER,
lfs_oid_id INTEGER NOT NULL, lfs_oid_id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL name VARCHAR(255) NOT NULL
) )

View File

@ -73,11 +73,11 @@ class OBS:
logging.debug(f"GET {url}") logging.debug(f"GET {url}")
return ET.parse(osc.core.http_GET(url)).getroot() return ET.parse(osc.core.http_GET(url)).getroot()
def _meta(self, key, **params): def _meta(self, project, package, **params):
try: try:
root = self._xml(f"source/{key}/_meta", **params) root = self._xml(f"source/{project}/{package}/_meta", **params)
except HTTPError: except HTTPError:
logging.error(f"Project/Package [{key} {params}] has no meta") logging.error(f"Package [{project}/{package} {params}] has no meta")
return None return None
return root return root
@ -118,13 +118,13 @@ class OBS:
return root return root
def exists(self, project, package): def exists(self, project, package):
root = self._meta(f"{project}/{package}") root = self._meta(project, package)
if root is None: if root is None:
return False return False
return root.get("project") == project return root.get("project") == project
def devel_project(self, project, package): def devel_project(self, project, package):
root = self._meta(f"{project}/{package}") root = self._meta(project, package)
devel = root.find("devel") devel = root.find("devel")
if devel is None: if devel is None:
return None return None
@ -150,7 +150,7 @@ class OBS:
def _download(self, project, package, name, revision): def _download(self, project, package, name, revision):
url = osc.core.makeurl( url = osc.core.makeurl(
self.url, self.url,
["source", project, package, name], ["source", project, package, urllib.parse.quote(name)],
{"rev": revision, "expand": 1}, {"rev": revision, "expand": 1},
) )
return osc.core.http_GET(url) return osc.core.http_GET(url)
@ -165,6 +165,7 @@ class OBS:
cachedir: str, cachedir: str,
file_md5: str, file_md5: str,
) -> None: ) -> None:
cached_file = self._path_from_md5(name, cachedir, file_md5) cached_file = self._path_from_md5(name, cachedir, file_md5)
if not self.in_cache(name, cachedir, file_md5): if not self.in_cache(name, cachedir, file_md5):
with (dirpath / name).open("wb") as f: with (dirpath / name).open("wb") as f:

View File

@ -7,6 +7,8 @@ except:
print("Install python3-python-magic, not python3-magic") print("Install python3-python-magic, not python3-magic")
raise raise
import requests
from lib.db import DB from lib.db import DB
from lib.lfs_oid import LFSOid from lib.lfs_oid import LFSOid
from lib.obs import OBS from lib.obs import OBS
@ -41,6 +43,7 @@ class ProxySHA256:
} }
def put(self, project, package, name, revision, file_md5, size): def put(self, project, package, name, revision, file_md5, size):
if not self.mime: if not self.mime:
self.mime = magic.Magic(mime=True) self.mime = magic.Magic(mime=True)

View File

@ -1,3 +1,4 @@
from typing import Dict
from xmlrpc.client import Boolean from xmlrpc.client import Boolean
from lib.db_revision import DBRevision from lib.db_revision import DBRevision
@ -113,7 +114,7 @@ class TreeBuilder:
candidates.append(node) candidates.append(node)
if node.merged_into: if node.merged_into:
# we can't have candidates that are crossing previous merges # we can't have candidates that are crossing previous merges
# see https://src.opensuse.org/importers/git-importer/issues/14 # see https://gitea.opensuse.org/importers/git-importer/issues/14
candidates = [] candidates = []
node = node.parent node = node.parent
if candidates: if candidates:
@ -137,7 +138,7 @@ class TreeBuilder:
self.requests.add(node.revision.request_id) self.requests.add(node.revision.request_id)
class FindMergeWalker(AbstractWalker): class FindMergeWalker(AbstractWalker):
def __init__(self, builder: TreeBuilder, requests: dict) -> None: def __init__(self, builder: TreeBuilder, requests: Dict) -> None:
super().__init__() super().__init__()
self.source_revisions = dict() self.source_revisions = dict()
self.builder = builder self.builder = builder

View File

@ -1,59 +0,0 @@
#!/usr/bin/python3
import json
from pathlib import Path
import pika
import random
import time
MY_TASKS_DIR = Path(__file__).parent / "tasks"
def listen_events():
connection = pika.BlockingConnection(
pika.URLParameters("amqps://opensuse:opensuse@rabbit.opensuse.org")
)
channel = connection.channel()
channel.exchange_declare(
exchange="pubsub", exchange_type="topic", passive=True, durable=False
)
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange="pubsub", queue=queue_name, routing_key="opensuse.obs.package.commit"
)
print(" [*] Waiting for logs. To exit press CTRL+C")
def callback(ch, method, properties, body):
if method.routing_key not in ("opensuse.obs.package.commit",):
return
body = json.loads(body)
if (
"project" in body
and "package" in body
and body["project"] == "openSUSE:Factory"
):
if "/" in body["package"]:
return
(MY_TASKS_DIR / body["package"]).touch()
print(" [x] %r:%r" % (method.routing_key, body["package"]))
channel.basic_consume(queue_name, callback, auto_ack=True)
channel.start_consuming()
def main():
while True:
try:
listen_events()
except (pika.exceptions.ConnectionClosed, pika.exceptions.AMQPHeartbeatTimeout):
time.sleep(random.randint(10, 100))
if __name__ == "__main__":
main()

1
tasks/.gitignore vendored
View File

@ -1 +0,0 @@
*

View File

@ -1,19 +0,0 @@
#!/bin/bash
#
cd /space/dmueller/git-importer
source credentials.sh
while true; do
for i in $PWD/tasks/*; do
if test -f "$i"; then
echo "$(date): Importing $(basename $i)"
if ! python3 ./git-importer.py -c repos/.cache $(basename $i); then
mkdir -p $PWD/failed-tasks
mv -f $i $PWD/failed-tasks
fi
rm -f $i
fi
done
inotifywait -q -e create $PWD/tasks
done