Migrate the ProxySHA256 data into postgresql DB

The calculation of the sha256 and the mimetype is local due to that
This commit is contained in:
Stephan Kulow 2022-11-07 19:40:19 +01:00
parent e1b32999f0
commit 3e1fbaa1c3
6 changed files with 269 additions and 107 deletions

View File

@ -221,6 +221,45 @@ class DB:
"ALTER TABLE revisions ALTER COLUMN api_url SET NOT NULL", "ALTER TABLE revisions ALTER COLUMN api_url SET NOT NULL",
"UPDATE scheme SET version=21", "UPDATE scheme SET version=21",
) )
schemes[22] = (
"""DROP TABLE IF EXISTS lfs_oids""",
"""
CREATE TABLE lfs_oids (
id SERIAL PRIMARY KEY,
project VARCHAR(255) NOT NULL,
package VARCHAR(255) NOT NULL,
filename VARCHAR(255) NOT NULL,
rev VARCHAR(40) NOT NULL,
sha256 VARCHAR(70) NOT NULL,
size INTEGER NOT NULL,
mimetype VARCHAR(255) NOT NULL,
file_md5 VARCHAR(40) NOT NULL
)
""",
"CREATE UNIQUE INDEX ON lfs_oids (sha256,size)",
"CREATE INDEX ON revisions(package)",
"""DROP TABLE IF EXISTS text_files""",
"""
CREATE TABLE text_files (
id SERIAL PRIMARY KEY,
package VARCHAR(255) NOT NULL,
filename VARCHAR(255) NOT NULL
)
""",
"CREATE UNIQUE INDEX ON text_files (package,filename)",
"""DROP TABLE IF EXISTS lfs_oid_in_package""",
"""
CREATE TABLE lfs_oid_in_package (
id SERIAL PRIMARY KEY,
lfs_oid_id INTEGER NOT NULL,
package VARCHAR(255) NOT NULL,
filename VARCHAR(255) NOT NULL
)
""",
"CREATE INDEX ON text_files(package)",
"CREATE INDEX ON lfs_oid_in_package(package)",
"UPDATE scheme SET version=22",
)
schema_version = self.schema_version() schema_version = self.schema_version()
if (schema_version + 1) not in schemes: if (schema_version + 1) not in schemes:
return return

View File

@ -17,7 +17,8 @@ class GitExporter:
self.obs = OBS(api_url) self.obs = OBS(api_url)
self.project = project self.project = project
self.package = package self.package = package
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) self.db = DB()
self.proxy_sha256 = ProxySHA256(self.obs, self.db)
self.git = Git( self.git = Git(
repodir / package, repodir / package,
committer="Git OBS Bridge", committer="Git OBS Bridge",
@ -59,8 +60,7 @@ class GitExporter:
return left_to_commit return left_to_commit
def export_as_git(self): def export_as_git(self):
db = DB() tree = TreeBuilder(self.db).build(self.project, self.package)
tree = TreeBuilder(db).build(self.project, self.package)
flats = tree.as_flat_list() flats = tree.as_flat_list()
branch_state = {"factory": None, "devel": None} branch_state = {"factory": None, "devel": None}
@ -75,9 +75,8 @@ class GitExporter:
for flat in left_to_commit: for flat in left_to_commit:
if flat.commit.userid not in users: if flat.commit.userid not in users:
users[flat.commit.userid] = User.find(db, flat.commit.userid) users[flat.commit.userid] = User.find(self.db, flat.commit.userid)
flat.user = users[flat.commit.userid] flat.user = users[flat.commit.userid]
logging.debug(f"USER {flat.user}")
self.gc_cnt -= 1 self.gc_cnt -= 1
if self.gc_cnt <= 0 and self.gc_interval: if self.gc_cnt <= 0 and self.gc_interval:
self.run_gc() self.run_gc()
@ -88,10 +87,14 @@ class GitExporter:
self.gc_cnt = self.gc_interval self.gc_cnt = self.gc_interval
self.git.gc() self.git.gc()
def is_lfs_file(self, package, filename, size):
if not is_binary_or_large(filename, size):
return False
return not self.proxy_sha256.is_text(package, filename)
def commit_file(self, flat, file, size, md5): def commit_file(self, flat, file, size, md5):
# have such files been detected as text mimetype before? # have such files been detected as text mimetype before?
is_text = self.proxy_sha256.is_text(flat.commit.package, file.name) if self.is_lfs_file(flat.commit.package, file.name, size):
if not is_text and is_binary_or_large(file.name, size):
file_sha256 = self.proxy_sha256.get_or_put( file_sha256 = self.proxy_sha256.get_or_put(
flat.commit.project, flat.commit.project,
flat.commit.package, flat.commit.package,
@ -100,8 +103,13 @@ class GitExporter:
md5, md5,
size, size,
) )
self.git.add_lfs(file.name, file_sha256["sha256"], size) # as it's newly registered, it might be a text file now, so double check
else: if not self.proxy_sha256.is_text(flat.commit.package, file.name):
self.git.add_lfs(file.name, file_sha256, size)
return
self.commit_non_lfs_file(flat, file, md5)
def commit_non_lfs_file(self, flat, file, md5):
self.obs.change_url(flat.commit.api_url) self.obs.change_url(flat.commit.api_url)
self.obs.download( self.obs.download(
flat.commit.project, flat.commit.project,

20
lib/hash.py Normal file
View File

@ -0,0 +1,20 @@
import functools
import hashlib
def _hash(hash_alg, file_or_path):
h = hash_alg()
def __hash(f):
while chunk := f.read(1024 * 4):
h.update(chunk)
if hasattr(file_or_path, "read"):
__hash(file_or_path)
else:
with file_or_path.open("rb") as f:
__hash(f)
return h.hexdigest()
md5 = functools.partial(_hash, hashlib.md5)

67
lib/lfs_oid.py Normal file
View File

@ -0,0 +1,67 @@
import logging
import sys
from lib.db import DB
# no need for this class yet, so just leave the migration code here
class LFSOid:
def __init__(self) -> None:
pass
if __name__ == "__main__":
"""
Import the old data - it only makes sense on a DB with previously scanned revisions
curl -s https://stephan.kulow.org/git_lfs.csv.xz | xz -cd | PYTHONPATH=$PWD /usr/bin/python3 lib/lfs_oid.py
"""
db = DB()
logging.basicConfig(level=logging.DEBUG)
with db.cursor() as cur:
while True:
line = sys.stdin.readline()
if not line:
break
(
project,
package,
filename,
rev,
sha256,
size,
mimetype,
md5,
) = line.strip().split("\t")
cur.execute(
"""INSERT INTO lfs_oids (project,package,filename,rev,sha256,size,mimetype,file_md5)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING""",
(project, package, filename, rev, sha256, size, mimetype, md5),
)
cur.execute(
"""
CREATE TEMPORARY TABLE lfs_oid_in_revision (
revision_id INTEGER,
lfs_oid_id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL
)
"""
)
cur.execute(
"""INSERT INTO lfs_oid_in_revision (revision_id, lfs_oid_id, name)
SELECT revision_id,lfs_oids.id,files.name FROM lfs_oids JOIN files ON files.md5=lfs_oids.file_md5"""
)
cur.execute(
"""INSERT INTO text_files (package,filename)
SELECT DISTINCT r.package, lfs_oid_in_revision.name FROM lfs_oids
JOIN lfs_oid_in_revision on lfs_oid_in_revision.lfs_oid_id=lfs_oids.id
JOIN revisions r ON r.id=lfs_oid_in_revision.revision_id
WHERE lfs_oids.mimetype like 'text/%' ON CONFLICT DO NOTHING"""
)
cur.execute(
"""INSERT INTO lfs_oid_in_package (lfs_oid_id, package, filename)
SELECT DISTINCT lfs_oids.id,r.package, lfs_oid_in_revision.name FROM lfs_oids
JOIN lfs_oid_in_revision on lfs_oid_in_revision.lfs_oid_id=lfs_oids.id
JOIN revisions r ON r.id=lfs_oid_in_revision.revision_id"""
)
db.conn.commit()

View File

@ -9,7 +9,7 @@ from urllib.error import HTTPError
import osc.core import osc.core
from lib.proxy_sha256 import md5 from lib.hash import md5
from lib.request import Request from lib.request import Request
from lib.user import User from lib.user import User

View File

@ -1,114 +1,142 @@
import functools
import hashlib import hashlib
import logging import logging
import urllib import os
try:
import magic
except:
print("Install python3-python-magic, not python3-magic")
raise
import requests import requests
from lib.db import DB
def _hash(hash_alg, file_or_path): from lib.obs import OBS
h = hash_alg()
def __hash(f):
while chunk := f.read(1024 * 4):
h.update(chunk)
if hasattr(file_or_path, "read"):
__hash(file_or_path)
else:
with file_or_path.open("rb") as f:
__hash(f)
return h.hexdigest()
md5 = functools.partial(_hash, hashlib.md5)
sha256 = functools.partial(_hash, hashlib.sha256)
class ProxySHA256: class ProxySHA256:
def __init__(self, obs, url=None, enabled=True): def __init__(self, obs: OBS, db: DB):
self.obs = obs self.obs = obs
self.url = url if url else "http://source.dyn.cloud.suse.de" self.db = db
self.enabled = enabled
self.hashes = None self.hashes = None
self.texts = None self.texts = None
self.mime = None
def load_package(self, package):
# _project is unreachable for the proxy - due to being a fake package
if package == "_project":
self.enabled = False
self.texts = set(["_config", "_service", "_staging_workflow"])
self.hashes = dict()
return
logging.debug("Retrieve all previously defined SHA256")
response = requests.get(
f"http://source.dyn.cloud.suse.de/package/{package}", timeout=50
)
if response.status_code == 200:
json = response.json()
self.hashes = json["shas"]
self.texts = set(json["texts"])
def get(self, package, name, file_md5): def get(self, package, name, file_md5):
key = f"{file_md5}-{name}"
if self.hashes is None: if self.hashes is None:
if self.enabled: self.load_hashes(package)
self.load_package(package) key = f"{file_md5}-{name}"
else: ret = self.hashes.get(key)
self.hashes = {} return ret
return self.hashes.get(key, None)
def _proxy_put(self, project, package, name, revision, file_md5, size): def load_hashes(self, package):
quoted_name = urllib.parse.quote(name) with self.db.cursor() as cur:
url = f"{self.obs.url}/public/source/{project}/{package}/{quoted_name}?rev={revision}" cur.execute(
response = requests.put( """SELECT lfs_oids.file_md5,lop.filename,lfs_oids.sha256,lfs_oids.size
self.url, FROM lfs_oid_in_package lop
data={ JOIN lfs_oids ON lfs_oids.id=lop.lfs_oid_id
"hash": file_md5, WHERE lop.package=%s""",
"filename": name, (package,),
"url": url,
"package": package,
},
timeout=10,
) )
if response.status_code != 200: self.hashes = {
raise Exception(f"Redirector error on {self.url} for {url}") f"{row[0]}-{row[1]}": (row[2], row[3]) for row in cur.fetchall()
key = (file_md5, name)
self.hashes[key] = {
"sha256": response.content.decode("utf-8"),
"fsize": size,
} }
return self.hashes[key]
def _obs_put(self, project, package, name, revision, file_md5, size):
key = (file_md5, name)
self.hashes[key] = {
"sha256": sha256(self.obs._download(project, package, name, revision)),
"fsize": size,
}
return self.hashes[key]
def put(self, project, package, name, revision, file_md5, size): def put(self, project, package, name, revision, file_md5, size):
if not self.enabled:
return self._obs_put(project, package, name, revision, file_md5, size) if not self.mime:
return self._proxy_put(project, package, name, revision, file_md5, size) self.mime = magic.Magic(mime=True)
mimetype = None
logging.debug(f"Add LFS for {project}/{package}/{name}")
fin = self.obs._download(project, package, name, revision)
sha = hashlib.sha256()
while True:
buffer = fin.read(10000)
if not buffer:
break
sha.update(buffer)
# only guess from the first 10K
if not mimetype:
mimetype = self.mime.from_buffer(buffer)
fin.close()
sha = sha.hexdigest()
with self.db.cursor() as cur:
# we UPDATE here so the return functions. conflicts are likely as we look for filename/md5 but conflict on sha256
cur.execute(
"""INSERT INTO lfs_oids (project,package,filename,rev,sha256,size,mimetype,file_md5)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (sha256,size) DO UPDATE SET mimetype=EXCLUDED.mimetype
RETURNING id""",
(
project,
package,
name,
revision,
sha,
size,
mimetype,
file_md5,
),
)
row = cur.fetchone()
lfs_oid_id = row[0]
cur.execute(
"""INSERT INTO lfs_oid_in_package (package,filename,lfs_oid_id)
VALUES (%s,%s,%s)""",
(package, name, lfs_oid_id),
)
if mimetype.startswith("text/"):
cur.execute(
"INSERT INTO text_files (package,filename) VALUES (%s,%s)",
(package, name),
)
self.db.conn.commit()
if os.getenv("GITEA_REGISTER_SECRET"):
data = {
"secret": os.getenv("GITEA_REGISTER_SECRET"),
"project": project,
"package": package,
"filename": name,
"rev": revision,
"sha256": sha,
"size": size,
}
url = "http://gitea.opensuse.org:9999/register"
response = requests.post(
url,
data=data,
timeout=10,
)
logging.debug(f"Registered {response.status_code}")
else:
logging.info("Not registering LFS due to missing secret")
# reset
self.hashes = None
self.texts = None
return self.get(package, name, file_md5)
def is_text(self, package, filename): def is_text(self, package, filename):
if self.texts is None: if self.texts is None:
if self.enabled: self.load_texts(package)
self.load_package(package)
else:
self.texts = set()
return filename in self.texts return filename in self.texts
def load_texts(self, package):
self.texts = set()
with self.db.cursor() as cur:
cur.execute("SELECT filename from text_files where package=%s", (package,))
for row in cur.fetchall():
self.texts.add(row[0])
def get_or_put(self, project, package, name, revision, file_md5, size): def get_or_put(self, project, package, name, revision, file_md5, size):
result = self.get(package, name, file_md5) result = self.get(package, name, file_md5)
if not result: if not result:
result = self.put(project, package, name, revision, file_md5, size) result = self.put(project, package, name, revision, file_md5, size)
# Sanity check sha256, db_size = result
if result["fsize"] != size: assert db_size == size
raise Exception(f"Redirector has different size for {name}")
return result return sha256