|
| 1 | +"""Archive writers for canonical Codex package directories.""" |
| 2 | + |
| 3 | +import shutil |
| 4 | +import subprocess |
| 5 | +import tarfile |
| 6 | +import tempfile |
| 7 | +import zipfile |
| 8 | +from pathlib import Path |
| 9 | + |
| 10 | + |
| 11 | +def write_archive(package_dir: Path, archive_path: Path, *, force: bool) -> None: |
| 12 | + if is_relative_to(archive_path, package_dir): |
| 13 | + raise RuntimeError( |
| 14 | + f"Archive output must be outside the package directory: {archive_path}" |
| 15 | + ) |
| 16 | + |
| 17 | + archive_path.parent.mkdir(parents=True, exist_ok=True) |
| 18 | + if archive_path.exists(): |
| 19 | + if not force: |
| 20 | + raise RuntimeError(f"Archive output already exists: {archive_path}") |
| 21 | + archive_path.unlink() |
| 22 | + |
| 23 | + archive_format = archive_format_for_path(archive_path) |
| 24 | + if archive_format == "tar.gz": |
| 25 | + write_tar_archive(package_dir, archive_path, mode="w:gz") |
| 26 | + elif archive_format == "tar.zst": |
| 27 | + write_tar_zst_archive(package_dir, archive_path) |
| 28 | + elif archive_format == "zip": |
| 29 | + write_zip_archive(package_dir, archive_path) |
| 30 | + else: |
| 31 | + raise AssertionError(f"unexpected archive format: {archive_format}") |
| 32 | + |
| 33 | + |
| 34 | +def is_relative_to(path: Path, parent: Path) -> bool: |
| 35 | + try: |
| 36 | + path.relative_to(parent) |
| 37 | + return True |
| 38 | + except ValueError: |
| 39 | + return False |
| 40 | + |
| 41 | + |
| 42 | +def archive_format_for_path(path: Path) -> str: |
| 43 | + suffixes = path.suffixes |
| 44 | + if suffixes[-2:] == [".tar", ".gz"] or path.suffix == ".tgz": |
| 45 | + return "tar.gz" |
| 46 | + if suffixes[-2:] == [".tar", ".zst"]: |
| 47 | + return "tar.zst" |
| 48 | + if path.suffix == ".zip": |
| 49 | + return "zip" |
| 50 | + raise RuntimeError( |
| 51 | + f"Unsupported archive suffix for {path}. Use .tar.gz, .tgz, .tar.zst, or .zip." |
| 52 | + ) |
| 53 | + |
| 54 | + |
| 55 | +def write_tar_archive(package_dir: Path, archive_path: Path, *, mode: str) -> None: |
| 56 | + with tarfile.open(archive_path, mode) as archive: |
| 57 | + for path in package_entries(package_dir): |
| 58 | + archive.add( |
| 59 | + path, |
| 60 | + arcname=path.relative_to(package_dir), |
| 61 | + recursive=False, |
| 62 | + ) |
| 63 | + |
| 64 | + |
| 65 | +def write_tar_zst_archive(package_dir: Path, archive_path: Path) -> None: |
| 66 | + zstd = shutil.which("zstd") |
| 67 | + if zstd is None: |
| 68 | + raise RuntimeError("zstd is required to write .tar.zst archives.") |
| 69 | + |
| 70 | + with tempfile.TemporaryDirectory(prefix="codex-package-archive-") as temp_dir_str: |
| 71 | + tar_path = Path(temp_dir_str) / "package.tar" |
| 72 | + write_tar_archive(package_dir, tar_path, mode="w") |
| 73 | + subprocess.check_call([zstd, "-T0", "-19", "-f", str(tar_path), "-o", str(archive_path)]) |
| 74 | + |
| 75 | + |
| 76 | +def write_zip_archive(package_dir: Path, archive_path: Path) -> None: |
| 77 | + with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: |
| 78 | + for path in package_entries(package_dir): |
| 79 | + relative_path = path.relative_to(package_dir) |
| 80 | + if path.is_dir(): |
| 81 | + archive.write(path, f"{relative_path}/") |
| 82 | + else: |
| 83 | + archive.write(path, relative_path) |
| 84 | + |
| 85 | + |
| 86 | +def package_entries(package_dir: Path) -> list[Path]: |
| 87 | + return sorted(package_dir.rglob("*"), key=lambda path: path.relative_to(package_dir).as_posix()) |
0 commit comments