diff --git a/lib/db.py b/lib/db.py index 5d817c7..57a5f37 100644 --- a/lib/db.py +++ b/lib/db.py @@ -221,6 +221,45 @@ class DB: "ALTER TABLE revisions ALTER COLUMN api_url SET NOT NULL", "UPDATE scheme SET version=21", ) + schemes[22] = ( + """DROP TABLE IF EXISTS lfs_oids""", + """ + CREATE TABLE lfs_oids ( + id SERIAL PRIMARY KEY, + project VARCHAR(255) NOT NULL, + package VARCHAR(255) NOT NULL, + filename VARCHAR(255) NOT NULL, + rev VARCHAR(40) NOT NULL, + sha256 VARCHAR(70) NOT NULL, + size INTEGER NOT NULL, + mimetype VARCHAR(255) NOT NULL, + file_md5 VARCHAR(40) NOT NULL + ) + """, + "CREATE UNIQUE INDEX ON lfs_oids (sha256,size)", + "CREATE INDEX ON revisions(package)", + """DROP TABLE IF EXISTS text_files""", + """ + CREATE TABLE text_files ( + id SERIAL PRIMARY KEY, + package VARCHAR(255) NOT NULL, + filename VARCHAR(255) NOT NULL + ) + """, + "CREATE UNIQUE INDEX ON text_files (package,filename)", + """DROP TABLE IF EXISTS lfs_oid_in_package""", + """ + CREATE TABLE lfs_oid_in_package ( + id SERIAL PRIMARY KEY, + lfs_oid_id INTEGER NOT NULL, + package VARCHAR(255) NOT NULL, + filename VARCHAR(255) NOT NULL + ) + """, + "CREATE INDEX ON text_files(package)", + "CREATE INDEX ON lfs_oid_in_package(package)", + "UPDATE scheme SET version=22", + ) schema_version = self.schema_version() if (schema_version + 1) not in schemes: return diff --git a/lib/git_exporter.py b/lib/git_exporter.py index 8736366..8c912f0 100644 --- a/lib/git_exporter.py +++ b/lib/git_exporter.py @@ -17,7 +17,8 @@ class GitExporter: self.obs = OBS(api_url) self.project = project self.package = package - self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) + self.db = DB() + self.proxy_sha256 = ProxySHA256(self.obs, self.db) self.git = Git( repodir / package, committer="Git OBS Bridge", @@ -59,8 +60,7 @@ class GitExporter: return left_to_commit def export_as_git(self): - db = DB() - tree = TreeBuilder(db).build(self.project, self.package) + tree = TreeBuilder(self.db).build(self.project, self.package) flats = tree.as_flat_list() branch_state = {"factory": None, "devel": None} @@ -75,9 +75,8 @@ class GitExporter: for flat in left_to_commit: if flat.commit.userid not in users: - users[flat.commit.userid] = User.find(db, flat.commit.userid) + users[flat.commit.userid] = User.find(self.db, flat.commit.userid) flat.user = users[flat.commit.userid] - logging.debug(f"USER {flat.user}") self.gc_cnt -= 1 if self.gc_cnt <= 0 and self.gc_interval: self.run_gc() @@ -88,10 +87,14 @@ class GitExporter: self.gc_cnt = self.gc_interval self.git.gc() + def is_lfs_file(self, package, filename, size): + if not is_binary_or_large(filename, size): + return False + return not self.proxy_sha256.is_text(package, filename) + def commit_file(self, flat, file, size, md5): # have such files been detected as text mimetype before? - is_text = self.proxy_sha256.is_text(flat.commit.package, file.name) - if not is_text and is_binary_or_large(file.name, size): + if self.is_lfs_file(flat.commit.package, file.name, size): file_sha256 = self.proxy_sha256.get_or_put( flat.commit.project, flat.commit.package, @@ -100,19 +103,24 @@ class GitExporter: md5, size, ) - self.git.add_lfs(file.name, file_sha256["sha256"], size) - else: - self.obs.change_url(flat.commit.api_url) - self.obs.download( - flat.commit.project, - flat.commit.package, - file.name, - flat.commit.expanded_srcmd5, - self.git.path, - self.cachedir, - file_md5=md5, - ) - self.git.add(file) + # as it's newly registered, it might be a text file now, so double check + if not self.proxy_sha256.is_text(flat.commit.package, file.name): + self.git.add_lfs(file.name, file_sha256, size) + return + self.commit_non_lfs_file(flat, file, md5) + + def commit_non_lfs_file(self, flat, file, md5): + self.obs.change_url(flat.commit.api_url) + self.obs.download( + flat.commit.project, + flat.commit.package, + file.name, + flat.commit.expanded_srcmd5, + self.git.path, + self.cachedir, + file_md5=md5, + ) + self.git.add(file) def branch_fits_parent1(self, flat, branch_state): if branch_state[flat.branch] is None: diff --git a/lib/hash.py b/lib/hash.py new file mode 100644 index 0000000..93dab08 --- /dev/null +++ b/lib/hash.py @@ -0,0 +1,20 @@ +import functools +import hashlib + + +def _hash(hash_alg, file_or_path): + h = hash_alg() + + def __hash(f): + while chunk := f.read(1024 * 4): + h.update(chunk) + + if hasattr(file_or_path, "read"): + __hash(file_or_path) + else: + with file_or_path.open("rb") as f: + __hash(f) + return h.hexdigest() + + +md5 = functools.partial(_hash, hashlib.md5) diff --git a/lib/lfs_oid.py b/lib/lfs_oid.py new file mode 100644 index 0000000..e01d254 --- /dev/null +++ b/lib/lfs_oid.py @@ -0,0 +1,67 @@ +import logging +import sys + +from lib.db import DB + + +# no need for this class yet, so just leave the migration code here +class LFSOid: + def __init__(self) -> None: + pass + + +if __name__ == "__main__": + """ + Import the old data - it only makes sense on a DB with previously scanned revisions + curl -s https://stephan.kulow.org/git_lfs.csv.xz | xz -cd | PYTHONPATH=$PWD /usr/bin/python3 lib/lfs_oid.py + """ + db = DB() + logging.basicConfig(level=logging.DEBUG) + with db.cursor() as cur: + while True: + line = sys.stdin.readline() + if not line: + break + ( + project, + package, + filename, + rev, + sha256, + size, + mimetype, + md5, + ) = line.strip().split("\t") + cur.execute( + """INSERT INTO lfs_oids (project,package,filename,rev,sha256,size,mimetype,file_md5) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING""", + (project, package, filename, rev, sha256, size, mimetype, md5), + ) + + cur.execute( + """ + CREATE TEMPORARY TABLE lfs_oid_in_revision ( + revision_id INTEGER, + lfs_oid_id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL + ) + """ + ) + cur.execute( + """INSERT INTO lfs_oid_in_revision (revision_id, lfs_oid_id, name) + SELECT revision_id,lfs_oids.id,files.name FROM lfs_oids JOIN files ON files.md5=lfs_oids.file_md5""" + ) + cur.execute( + """INSERT INTO text_files (package,filename) + SELECT DISTINCT r.package, lfs_oid_in_revision.name FROM lfs_oids + JOIN lfs_oid_in_revision on lfs_oid_in_revision.lfs_oid_id=lfs_oids.id + JOIN revisions r ON r.id=lfs_oid_in_revision.revision_id + WHERE lfs_oids.mimetype like 'text/%' ON CONFLICT DO NOTHING""" + ) + cur.execute( + """INSERT INTO lfs_oid_in_package (lfs_oid_id, package, filename) + SELECT DISTINCT lfs_oids.id,r.package, lfs_oid_in_revision.name FROM lfs_oids + JOIN lfs_oid_in_revision on lfs_oid_in_revision.lfs_oid_id=lfs_oids.id + JOIN revisions r ON r.id=lfs_oid_in_revision.revision_id""" + ) + db.conn.commit() diff --git a/lib/obs.py b/lib/obs.py index 9291c20..3d149ae 100644 --- a/lib/obs.py +++ b/lib/obs.py @@ -9,7 +9,7 @@ from urllib.error import HTTPError import osc.core -from lib.proxy_sha256 import md5 +from lib.hash import md5 from lib.request import Request from lib.user import User diff --git a/lib/proxy_sha256.py b/lib/proxy_sha256.py index 475353b..be5da7d 100644 --- a/lib/proxy_sha256.py +++ b/lib/proxy_sha256.py @@ -1,114 +1,142 @@ -import functools import hashlib import logging -import urllib +import os + +try: + import magic +except: + print("Install python3-python-magic, not python3-magic") + raise import requests - -def _hash(hash_alg, file_or_path): - h = hash_alg() - - def __hash(f): - while chunk := f.read(1024 * 4): - h.update(chunk) - - if hasattr(file_or_path, "read"): - __hash(file_or_path) - else: - with file_or_path.open("rb") as f: - __hash(f) - return h.hexdigest() - - -md5 = functools.partial(_hash, hashlib.md5) -sha256 = functools.partial(_hash, hashlib.sha256) +from lib.db import DB +from lib.obs import OBS class ProxySHA256: - def __init__(self, obs, url=None, enabled=True): + def __init__(self, obs: OBS, db: DB): self.obs = obs - self.url = url if url else "http://source.dyn.cloud.suse.de" - self.enabled = enabled + self.db = db self.hashes = None self.texts = None - - def load_package(self, package): - # _project is unreachable for the proxy - due to being a fake package - if package == "_project": - self.enabled = False - self.texts = set(["_config", "_service", "_staging_workflow"]) - self.hashes = dict() - return - logging.debug("Retrieve all previously defined SHA256") - response = requests.get( - f"http://source.dyn.cloud.suse.de/package/{package}", timeout=50 - ) - if response.status_code == 200: - json = response.json() - self.hashes = json["shas"] - self.texts = set(json["texts"]) + self.mime = None def get(self, package, name, file_md5): - key = f"{file_md5}-{name}" if self.hashes is None: - if self.enabled: - self.load_package(package) - else: - self.hashes = {} - return self.hashes.get(key, None) + self.load_hashes(package) + key = f"{file_md5}-{name}" + ret = self.hashes.get(key) + return ret - def _proxy_put(self, project, package, name, revision, file_md5, size): - quoted_name = urllib.parse.quote(name) - url = f"{self.obs.url}/public/source/{project}/{package}/{quoted_name}?rev={revision}" - response = requests.put( - self.url, - data={ - "hash": file_md5, - "filename": name, - "url": url, - "package": package, - }, - timeout=10, - ) - if response.status_code != 200: - raise Exception(f"Redirector error on {self.url} for {url}") - - key = (file_md5, name) - self.hashes[key] = { - "sha256": response.content.decode("utf-8"), - "fsize": size, - } - return self.hashes[key] - - def _obs_put(self, project, package, name, revision, file_md5, size): - key = (file_md5, name) - self.hashes[key] = { - "sha256": sha256(self.obs._download(project, package, name, revision)), - "fsize": size, - } - return self.hashes[key] + def load_hashes(self, package): + with self.db.cursor() as cur: + cur.execute( + """SELECT lfs_oids.file_md5,lop.filename,lfs_oids.sha256,lfs_oids.size + FROM lfs_oid_in_package lop + JOIN lfs_oids ON lfs_oids.id=lop.lfs_oid_id + WHERE lop.package=%s""", + (package,), + ) + self.hashes = { + f"{row[0]}-{row[1]}": (row[2], row[3]) for row in cur.fetchall() + } def put(self, project, package, name, revision, file_md5, size): - if not self.enabled: - return self._obs_put(project, package, name, revision, file_md5, size) - return self._proxy_put(project, package, name, revision, file_md5, size) + + if not self.mime: + self.mime = magic.Magic(mime=True) + + mimetype = None + logging.debug(f"Add LFS for {project}/{package}/{name}") + fin = self.obs._download(project, package, name, revision) + sha = hashlib.sha256() + while True: + buffer = fin.read(10000) + if not buffer: + break + sha.update(buffer) + # only guess from the first 10K + if not mimetype: + mimetype = self.mime.from_buffer(buffer) + fin.close() + sha = sha.hexdigest() + with self.db.cursor() as cur: + # we UPDATE here so the return functions. conflicts are likely as we look for filename/md5 but conflict on sha256 + cur.execute( + """INSERT INTO lfs_oids (project,package,filename,rev,sha256,size,mimetype,file_md5) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (sha256,size) DO UPDATE SET mimetype=EXCLUDED.mimetype + RETURNING id""", + ( + project, + package, + name, + revision, + sha, + size, + mimetype, + file_md5, + ), + ) + row = cur.fetchone() + lfs_oid_id = row[0] + cur.execute( + """INSERT INTO lfs_oid_in_package (package,filename,lfs_oid_id) + VALUES (%s,%s,%s)""", + (package, name, lfs_oid_id), + ) + if mimetype.startswith("text/"): + cur.execute( + "INSERT INTO text_files (package,filename) VALUES (%s,%s)", + (package, name), + ) + self.db.conn.commit() + + if os.getenv("GITEA_REGISTER_SECRET"): + data = { + "secret": os.getenv("GITEA_REGISTER_SECRET"), + "project": project, + "package": package, + "filename": name, + "rev": revision, + "sha256": sha, + "size": size, + } + + url = "http://gitea.opensuse.org:9999/register" + response = requests.post( + url, + data=data, + timeout=10, + ) + logging.debug(f"Registered {response.status_code}") + else: + logging.info("Not registering LFS due to missing secret") + + # reset + self.hashes = None + self.texts = None + return self.get(package, name, file_md5) def is_text(self, package, filename): if self.texts is None: - if self.enabled: - self.load_package(package) - else: - self.texts = set() + self.load_texts(package) return filename in self.texts + def load_texts(self, package): + self.texts = set() + with self.db.cursor() as cur: + cur.execute("SELECT filename from text_files where package=%s", (package,)) + for row in cur.fetchall(): + self.texts.add(row[0]) + def get_or_put(self, project, package, name, revision, file_md5, size): result = self.get(package, name, file_md5) if not result: result = self.put(project, package, name, revision, file_md5, size) - # Sanity check - if result["fsize"] != size: - raise Exception(f"Redirector has different size for {name}") + sha256, db_size = result + assert db_size == size - return result + return sha256