[v5,03/12] scripts: Add debrepo python script handling base-apt

Message ID 20230526070027.16890-4-ubely@ilbers.de
State RFC
Headers show
Series Improving base-apt usage PoC | expand

Commit Message

Uladzimir Bely May 26, 2023, 7 a.m. UTC
This is the main utility responsible for prefetching packages
into local `base-apt` repo from external Debian mirrors. It uses
python-apt module and requires some kind of minimal `rootfs` to work
(let's call it "debrepo context").

Once initialized with `--init --workdir=<path>`, it stores the initial
configuration in `repo.opts` file inside the context and uses it at
futher calls.

In future, the logic `debrepo` script implements could be directly
implemented inside bitbake classes.

Signed-off-by: Uladzimir Bely <ubely@ilbers.de>
---
 scripts/debrepo | 443 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 443 insertions(+)
 create mode 100755 scripts/debrepo

Patch

diff --git a/scripts/debrepo b/scripts/debrepo
new file mode 100755
index 00000000..2a5e5b51
--- /dev/null
+++ b/scripts/debrepo
@@ -0,0 +1,443 @@ 
+#!/usr/bin/env python3
+
+# This software is a part of ISAR.
+# Copyright (C) 2022 ilbers GmbH
+
+import os
+import sys
+import fcntl
+
+import shutil
+import subprocess
+import getopt
+import pickle
+import urllib.parse
+
+import apt_pkg
+import apt.progress.base
+
+
+class DebRepo(object):
+    def __init__(self, workdir, cmdline_opts):
+        self.workdir = workdir
+        self.optsfile = self.workdir + "/repo.opts"
+
+        # Set default values
+        self.distro = "debian"
+
+        self.repo = self.workdir + "/repo/apt" + "/" + self.distro
+        self.repodb = self.workdir + "/repo/db" + "/" + self.distro
+        self.mirror = "http://deb.debian.org/debian"
+        self.arch = "amd64"
+        self.codename = "bullseye"
+        self.keydir = "/etc/apt/trusted.gpg.d"
+        self.check_gpg = True
+        self.isaraptdir = ""
+
+        # Load stored opts
+        opts = self.load_opts()
+        print("stored opts: " + str(opts))
+
+        # Overwrite opts by cmdline_opts
+        for opt, arg in cmdline_opts.items():
+            opts[opt] = arg
+
+        print("all opts: " + str(opts))
+
+        # Replace by values passed in commandline
+        for opt, arg in opts.items():
+            if opt == "mirror":
+                self.mirror = arg
+            if opt == "arch":
+                self.arch = arg
+            if opt == "distro":
+                self.distro = arg
+            if opt == "codename":
+                self.codename = arg
+            if opt == "keydir":
+                self.keydir = arg
+            if opt == "isaraptdir":
+                self.isaraptdir = arg
+            if opt == "check_gpg":
+                self.check_gpg = arg
+
+        self.crossarch = self.arch
+        for opt, arg in opts.items():
+            if opt == "crossarch":
+                self.crossarch = arg
+
+        self.compatarch = ""
+        for opt, arg in opts.items():
+            if opt == "compatarch":
+                self.compatarch = arg
+
+        for opt, arg in opts.items():
+            if opt == "repodir":
+                self.repo = arg + "/" + self.distro
+            if opt == "repodbdir":
+                self.repodb = arg + "/" + self.distro
+
+        self.save_opts(opts)
+
+        print("workdir:     " + str(self.workdir))
+        print("repo:        " + str(self.repo))
+        print("repodb:      " + str(self.repodb))
+        print("mirror:      " + str(self.mirror))
+        print("arch:        " + str(self.arch))
+        print("crossarch:   " + str(self.crossarch))
+        if self.compatarch:
+            print("compatarch:  " + str(self.compatarch))
+        print("distro:      " + str(self.distro))
+        print("codename:    " + str(self.codename))
+        print("keydir:      " + str(self.keydir))
+        print("isaraptdir:  " + str(self.isaraptdir))
+        print("check_gpg:   " + str(self.check_gpg))
+
+        self.cache = None
+        self.depcache = None
+        self.sr = None
+
+    def create_rootfs(self, aptsrcsfile):
+        if not os.path.exists(self.workdir + "/var/lib/dpkg"):
+            os.makedirs(self.workdir + "/var/lib/dpkg")
+        with open(self.workdir + "/var/lib/dpkg" + "/status", "w"):
+            pass
+
+        if not os.path.exists(self.workdir + "/etc/apt/sources.list.d"):
+            os.makedirs(self.workdir + "/etc/apt/sources.list.d")
+        if os.path.exists(aptsrcsfile):
+            shutil.copy(aptsrcsfile, self.workdir + "/etc/apt/sources.list.d/bootstrap.list")
+
+        if self.isaraptdir:
+            with open(self.workdir + "/etc/apt/sources.list.d/isar-apt.list", "w") as f:
+                f.write("deb [trusted=yes] file://" + self.isaraptdir + " isar main\n")
+
+        if not os.path.exists(self.workdir + "/../apt_cache/" + self.distro + "-" + self.codename + "/archives/partial"):
+            os.makedirs(self.workdir + "/../apt_cache/" + self.distro + "-" + self.codename + "/archives/partial")
+
+        if not os.path.exists(self.workdir + "/tmp"):
+            os.makedirs(self.workdir + "/tmp")
+
+    def apt_config(self, init, crossbuild):
+        # Configure apt to work with empty directory
+        if not init and self.arch != self.crossarch:
+            apt_pkg.config["APT::Architectures::"] = self.crossarch
+            apt_pkg.config["APT::Architectures::"] = self.arch
+
+        if not init and self.compatarch:
+            apt_pkg.config["APT::Architectures::"] = self.compatarch
+
+        apt_pkg.config.set("APT::Architecture", self.arch)
+
+        apt_pkg.config.set("Dir", self.workdir)
+        apt_pkg.config.set("Dir::Cache", self.workdir + "/../apt_cache/" + self.distro + "-" + self.codename)
+        apt_pkg.config.set("Dir::State::status", self.workdir + "/var/lib/dpkg/status")
+
+        apt_pkg.config.set("APT::Install-Recommends", "0")
+        apt_pkg.config.set("APT::Install-Suggests", "0")
+
+        # Use host keys for authentification
+        # apt_pkg.config.set("Dir::Etc::Trusted", "/etc/apt/trusted.gpg")
+        # apt_pkg.config.set("Dir::Etc::TrustedParts", "/etc/apt/trusted.gpg.d")
+        apt_pkg.config.set("Dir::Etc::TrustedParts", self.keydir)
+
+        # Allow using repositories without keys
+        if not self.check_gpg:
+            apt_pkg.config.set("Acquire::AllowInsecureRepositories", "1")
+
+    def mark_essential(self):
+        for pkg in self.cache.packages:
+            if pkg.architecture == self.arch:
+                if pkg.essential:
+                    self.depcache.mark_install(pkg)
+
+    def mark_by_prio(self, priority):
+        for pkg in self.cache.packages:
+            if pkg.architecture == self.arch:
+                ver = self.depcache.get_candidate_ver(pkg)
+                if ver and ver.priority <= priority:
+                    self.depcache.mark_install(pkg)
+
+    def mark_pkg(self, pkgname, crossbuild):
+        if pkgname in self.cache:
+            pkg = self.cache[pkgname]
+
+            if not crossbuild or ':' in pkgname or len(pkg.version_list) == 0:
+                    if pkg.has_provides and not pkg.has_versions:
+                        print("pkgname is virtual package, selecting best provide")
+                        # Select first provide
+                        pkg_provide = pkg.provides_list[0][2]
+                        # Find better provide with higher version
+                        for provide in pkg.provides_list:
+                            if apt_pkg.version_compare(provide[2].ver_str, pkg_provide.ver_str) > 0:
+                                pkg_provide = provide[2]
+                        self.depcache.mark_install(pkg_provide.parent_pkg)
+                    else:
+                        self.depcache.mark_install(pkg)
+            else:
+                version = pkg.version_list[0]
+                if version.arch == "all":
+                    self.depcache.mark_install(pkg)
+                else:
+                    if version.multi_arch == version.MULTI_ARCH_FOREIGN:
+                        if (pkgname, self.arch) in self.cache:
+                            nativepkg = self.cache[pkgname, self.arch]
+                            self.depcache.mark_install(nativepkg)
+                    else:
+                        if (pkgname, self.crossarch) in self.cache:
+                            crosspkg = self.cache[pkgname, self.crossarch]
+                            self.depcache.mark_install(crosspkg)
+
+    def mark_list(self, pkglist, crossbuild):
+        if pkglist:
+            for pkgname in pkglist:
+                self.mark_pkg(pkgname, crossbuild)
+
+    def handle_deb(self, item):
+        lockfd = open(self.repo + '/../repo.lock', 'w')
+        fcntl.flock(lockfd,fcntl.LOCK_EX)
+        subprocess.run([
+            "reprepro",
+            "--dbdir", self.repodb,
+            "--outdir", self.repo,
+            "--confdir", self.repo + "/conf",
+            "-C", "main",
+            "includedeb",
+            self.codename,
+            item.destfile
+            ])
+        lockfd.close()
+
+    def handle_repo(self, fetcher):
+        lockfd = open(self.workdir + "/../apt_cache/" + self.distro + "-" + self.codename + ".lock", "w")
+        fcntl.flock(lockfd,fcntl.LOCK_EX)
+        fetcher.run()
+        lockfd.close()
+        for item in fetcher.items:
+            if item.status == item.STAT_ERROR:
+                print("Some error ocured: '%s'" % item.error_text)
+                pass
+            else:
+                self.handle_deb(item)
+
+    def get_filename(self, uri):
+        path = urllib.parse.urlparse(uri).path
+        unquoted_path = urllib.parse.unquote(path)
+        basename = os.path.basename(unquoted_path)
+        return basename
+
+    def download_file(self, uri):
+        filename = self.get_filename(uri)
+        subprocess.run([
+            "wget",
+            "-H",
+            "--timeout=30",
+            "--tries=3",
+            "-q",
+            uri,
+            "-O",
+            self.workdir + "/tmp/" + filename
+        ])
+
+    def handle_dsc(self, uri):
+        filename = self.get_filename(uri)
+        lockfd = open(self.repo + '/../repo.lock', 'w')
+        fcntl.flock(lockfd,fcntl.LOCK_EX)
+        subprocess.run([
+            "reprepro",
+            "--dbdir", self.repodb,
+            "--outdir", self.repo,
+            "--confdir", self.repo + "/conf",
+            "-C", "main",
+            "-S", "-", "-P" "source",
+            "--delete",
+            "includedsc",
+            self.codename,
+            os.path.realpath(self.workdir + "/tmp/" + filename)
+            ])
+        lockfd.close()
+
+    def handle_src_list(self, pkgs):
+        if pkgs:
+            for pkg in pkgs:
+                pkgname=pkg
+                pkgver=""
+                if '=' in pkg:
+                    pkgname = pkg.split("=")[0]
+                    pkgver = pkg.split("=")[1]
+
+                self.sr.restart()
+                while self.sr.lookup(pkgname):
+                    if pkgver and pkgver != self.sr.version:
+                        continue
+
+                    for sr_file in self.sr.files:
+                        print(self.sr.index.archive_uri(sr_file[2]))
+                        self.download_file(self.sr.index.archive_uri(sr_file[2]))
+
+                    dsc_uri = self.sr.index.archive_uri(self.sr.files[0][2])
+                    self.handle_dsc(dsc_uri)
+                    break
+
+    def apt_run(self, init, srcmode, pkgs, controlfile, crossbuild):
+        apt_pkg.init()
+
+        sources = apt_pkg.SourceList()
+        sources.read_main_list()
+
+        progress = apt.progress.text.AcquireProgress()
+
+        self.cache = apt_pkg.Cache()
+        self.cache.update(progress, sources)
+        self.cache = apt_pkg.Cache()
+
+        self.depcache = apt_pkg.DepCache(self.cache)
+        self.sr = apt_pkg.SourceRecords()
+
+        if init:
+            self.mark_essential()
+            # 1(required), 2(important), 3(standard), 4(optional), 5(extra)
+            self.mark_by_prio(1)
+
+        if srcmode:
+            self.handle_src_list(pkgs)
+        else:
+            self.mark_list(pkgs, crossbuild)
+
+        if controlfile:
+            fobj = open(controlfile, "r")
+
+            try:
+                tagfile = apt_pkg.TagFile(fobj)
+                while tagfile.step() == 1:
+                    deps = tagfile.section.get("Build-Depends", "")
+                    # Remove extra commas and spaces - apt_pkg.parse_depends doesnt like
+                    # lines like ", device-tree-compiler"
+                    deps = ', '.join([s.strip() for s in deps.split(',') if s.strip()])
+                    print("parsed deps: " + str(deps))
+                    for item in apt_pkg.parse_depends(deps, False):
+                        pkgname = item[0][0]
+                        self.mark_pkg(pkgname, crossbuild)
+
+            finally:
+                fobj.close()
+
+        if init or not srcmode:
+            fetcher = apt_pkg.Acquire(progress)
+            pm = apt_pkg.PackageManager(self.depcache)
+
+            recs = apt_pkg.PackageRecords(self.cache)
+            pm.get_archives(fetcher, sources, recs)
+
+            self.handle_repo(fetcher)
+
+    def load_opts(self):
+        params = {}
+        if os.path.isfile(self.optsfile):
+            with open(self.optsfile, 'rb') as file:
+                data = file.read()
+                if data:
+                    params = pickle.loads(data)
+
+        return params
+
+    def save_opts(self, opts):
+        file = open(self.optsfile, 'wb')
+        pickle.dump(opts, file)
+        file.close()
+
+
+class DebRepoArgs(object):
+    def __init__(self):
+        self.workdir = ""
+        self.init = False
+        self.srcmode = False
+        self.controlfile = ""
+        self.aptsrcsfile = ""
+        self.crossbuild = False
+
+        self.opts = {}
+        self.pkgs = []
+
+        try:
+            opts, args = getopt.getopt(sys.argv[1:], "", [
+                "init",
+                "srcmode",
+                "workdir=",
+                "repodir=",
+                "repodbdir=",
+                "mirror=",
+                "arch=",
+                "crossarch=",
+                "compatarch=",
+                "distro=",
+                "codename=",
+                "keydir=",
+                "isaraptdir=",
+                "no-check-gpg",
+                "controlfile=",
+                "aptsrcsfile=",
+                "crossbuild"
+                ])
+        except getopt.GetoptError as msg:
+            print("Error: " + str(msg))
+            sys.exit(1)
+
+        for opt, arg in opts:
+            if opt in ("--workdir"):
+                self.workdir = arg
+            if opt in ("--init"):
+                self.init = True
+            if opt in ("--srcmode"):
+                self.srcmode = True
+            if opt in ("--controlfile"):
+                self.controlfile = arg
+            if opt in ("--aptsrcsfile"):
+                self.aptsrcsfile = arg
+            if opt in ("--crossbuild"):
+                self.crossbuild = True
+
+            if opt in ("--repodir",
+                       "--repodbdir",
+                       "--mirror",
+                       "--arch",
+                       "--crossarch",
+                       "--compatarch",
+                       "--distro",
+                       "--codename",
+                       "--keydir",
+                       "--isaraptdir",
+                       "--controlfile",
+                       ):
+                self.opts[opt[2:]] = arg
+            if opt in ("--no-check-gpg"):
+                self.opts['check_gpg'] = False
+
+        if not self.workdir:
+            print("Error: workdir is not specified")
+            sys.exit(1)
+
+        self.pkgs = args
+
+
+def main():
+    args = DebRepoArgs()
+
+    if not (args.init or args.pkgs or args.controlfile):
+        print("Nothing to do")
+        sys.exit(0)
+
+    if not os.path.exists(args.workdir):
+        os.makedirs(args.workdir)
+
+    debrepo = DebRepo(args.workdir, args.opts)
+
+    if (args.init):
+        debrepo.create_rootfs(args.aptsrcsfile)
+
+    debrepo.apt_config(args.init, args.crossbuild)
+    debrepo.apt_run(args.init, args.srcmode, args.pkgs, args.controlfile, args.crossbuild)
+
+
+if __name__ == "__main__":
+    main()