123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598 |
- import json
- import os
- import re
- import shutil
- import subprocess
- import sys
- import timeit
- from copy import deepcopy
- from typing import Literal, NotRequired, Optional, TypedDict
- import requests
- import yaml
- from semver import Version
- # Get TMP_DIR variable from environment
- TMP_DIR = os.path.join(os.environ.get("TMP_DIR", "/tmp"), "ohmyzsh")
- # Relative path to dependencies.yml file
- DEPS_YAML_FILE = ".github/dependencies.yml"
- # Dry run flag
- DRY_RUN = os.environ.get("DRY_RUN", "0") == "1"
- # utils for tag comparison
- BASEVERSION = re.compile(
- r"""[vV]?
- (?P<major>(0|[1-9])\d*)
- (\.
- (?P<minor>(0|[1-9])\d*)
- (\.
- (?P<patch>(0|[1-9])\d*)
- )?
- )?
- """,
- re.VERBOSE,
- )
- def coerce(version: str) -> Optional[Version]:
- match = BASEVERSION.search(version)
- if not match:
- return None
- # BASEVERSION looks for `MAJOR.minor.patch` in the string given
- # it fills with None if any of them is missing (for example `2.1`)
- ver = {
- key: 0 if value is None else value for key, value in match.groupdict().items()
- }
- # Version takes `major`, `minor`, `patch` arguments
- ver = Version(**ver) # pyright: ignore[reportArgumentType]
- return ver
- class CodeTimer:
- def __init__(self, name=None):
- self.name = " '" + name + "'" if name else ""
- def __enter__(self):
- self.start = timeit.default_timer()
- def __exit__(self, exc_type, exc_value, traceback):
- self.took = (timeit.default_timer() - self.start) * 1000.0
- print("Code block" + self.name + " took: " + str(self.took) + " ms")
- ### YAML representation
- def str_presenter(dumper, data):
- """
- Configures yaml for dumping multiline strings
- Ref: https://stackoverflow.com/a/33300001
- """
- if len(data.splitlines()) > 1: # check for multiline string
- return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
- return dumper.represent_scalar("tag:yaml.org,2002:str", data)
- yaml.add_representer(str, str_presenter)
- yaml.representer.SafeRepresenter.add_representer(str, str_presenter)
- # Types
- class DependencyDict(TypedDict):
- repo: str
- branch: str
- version: str
- precopy: NotRequired[str]
- postcopy: NotRequired[str]
- class DependencyYAML(TypedDict):
- dependencies: dict[str, DependencyDict]
- class UpdateStatusFalse(TypedDict):
- has_updates: Literal[False]
- class UpdateStatusTrue(TypedDict):
- has_updates: Literal[True]
- version: str
- compare_url: str
- head_ref: str
- head_url: str
- class CommandRunner:
- class Exception(Exception):
- def __init__(self, message, returncode, stage, stdout, stderr):
- super().__init__(message)
- self.returncode = returncode
- self.stage = stage
- self.stdout = stdout
- self.stderr = stderr
- @staticmethod
- def run_or_fail(command: list[str], stage: str, *args, **kwargs):
- if DRY_RUN and command[0] == "gh":
- command.insert(0, "echo")
- result = subprocess.run(command, *args, capture_output=True, **kwargs)
- if result.returncode != 0:
- raise CommandRunner.Exception(
- f"{stage} command failed with exit code {result.returncode}",
- returncode=result.returncode,
- stage=stage,
- stdout=result.stdout.decode("utf-8"),
- stderr=result.stderr.decode("utf-8"),
- )
- return result
- class DependencyStore:
- store: DependencyYAML = {"dependencies": {}}
- @staticmethod
- def set(data: DependencyYAML):
- DependencyStore.store = data
- @staticmethod
- def update_dependency_version(path: str, version: str) -> DependencyYAML:
- with CodeTimer(f"store deepcopy: {path}"):
- store_copy = deepcopy(DependencyStore.store)
- dependency = store_copy["dependencies"].get(path)
- if dependency is None:
- raise ValueError(f"Dependency {path} {version} not found")
- dependency["version"] = version
- store_copy["dependencies"][path] = dependency
- return store_copy
- @staticmethod
- def write_store(file: str, data: DependencyYAML):
- with open(file, "w") as yaml_file:
- yaml.safe_dump(data, yaml_file, sort_keys=False)
- class Dependency:
- def __init__(self, path: str, values: DependencyDict):
- self.path = path
- self.values = values
- self.name: str = ""
- self.desc: str = ""
- self.kind: str = ""
- match path.split("/"):
- case ["plugins", name]:
- self.name = name
- self.kind = "plugin"
- self.desc = f"{name} plugin"
- case ["themes", name]:
- self.name = name.replace(".zsh-theme", "")
- self.kind = "theme"
- self.desc = f"{self.name} theme"
- case _:
- self.name = self.desc = path
- def __str__(self):
- output: str = ""
- for key in DependencyDict.__dict__["__annotations__"].keys():
- if key not in self.values:
- output += f"{key}: None\n"
- continue
- value = self.values[key]
- if "\n" not in value:
- output += f"{key}: {value}\n"
- else:
- output += f"{key}:\n "
- output += value.replace("\n", "\n ", value.count("\n") - 1)
- return output
- def update_or_notify(self):
- # Print dependency settings
- print(f"Processing {self.desc}...", file=sys.stderr)
- print(self, file=sys.stderr)
- # Check for updates
- repo = self.values["repo"]
- remote_branch = self.values["branch"]
- version = self.values["version"]
- is_tag = version.startswith("tag:")
- try:
- with CodeTimer(f"update check: {repo}"):
- if is_tag:
- status = GitHub.check_newer_tag(repo, version.replace("tag:", ""))
- else:
- status = GitHub.check_updates(repo, remote_branch, version)
- if status["has_updates"] is True:
- short_sha = status["head_ref"][:8]
- new_version = status["version"] if is_tag else short_sha
- try:
- branch_name = f"update/{self.path}/{new_version}"
- # Create new branch
- branch = Git.checkout_or_create_branch(branch_name)
- # Update dependencies.yml file
- self.__update_yaml(
- f"tag:{new_version}" if is_tag else status["version"]
- )
- # Update dependency files
- self.__apply_upstream_changes()
- # Add all changes and commit
- has_new_commit = Git.add_and_commit(self.name, short_sha)
- if has_new_commit:
- # Push changes to remote
- Git.push(branch)
- # Create GitHub PR
- GitHub.create_pr(
- branch,
- f"feat({self.name}): update to version {new_version}",
- f"""## Description
- Update for **{self.desc}**: update to version [{new_version}]({status['head_url']}).
- Check out the [list of changes]({status['compare_url']}).
- """,
- )
- # Clean up repository
- Git.clean_repo()
- except (CommandRunner.Exception, shutil.Error) as e:
- # Handle exception on automatic update
- match type(e):
- case CommandRunner.Exception:
- # Print error message
- print(
- f"Error running {e.stage} command: {e.returncode}", # pyright: ignore[reportAttributeAccessIssue]
- file=sys.stderr,
- )
- print(e.stderr, file=sys.stderr) # pyright: ignore[reportAttributeAccessIssue]
- case shutil.Error:
- print(f"Error copying files: {e}", file=sys.stderr)
- try:
- Git.clean_repo()
- except CommandRunner.Exception as e:
- print(
- f"Error reverting repository to clean state: {e}",
- file=sys.stderr,
- )
- sys.exit(1)
- # Create a GitHub issue to notify maintainer
- title = f"{self.path}: update to {new_version}"
- body = f"""## Description
- There is a new version of `{self.name}` {self.kind} available.
- New version: [{new_version}]({status['head_url']})
- Check out the [list of changes]({status['compare_url']}).
- """
- print("Creating GitHub issue", file=sys.stderr)
- print(f"{title}\n\n{body}", file=sys.stderr)
- GitHub.create_issue(title, body)
- except Exception as e:
- print(e, file=sys.stderr)
- def __update_yaml(self, new_version: str) -> None:
- dep_yaml = DependencyStore.update_dependency_version(self.path, new_version)
- DependencyStore.write_store(DEPS_YAML_FILE, dep_yaml)
- def __apply_upstream_changes(self) -> None:
- # Patterns to ignore in copying files from upstream repo
- GLOBAL_IGNORE = [".git", ".github", ".gitignore"]
- path = os.path.abspath(self.path)
- precopy = self.values.get("precopy")
- postcopy = self.values.get("postcopy")
- repo = self.values["repo"]
- branch = self.values["branch"]
- remote_url = f"https://github.com/{repo}.git"
- repo_dir = os.path.join(TMP_DIR, repo)
- # Clone repository
- Git.clone(remote_url, branch, repo_dir, reclone=True)
- # Run precopy on tmp repo
- if precopy is not None:
- print("Running precopy script:", end="\n ", file=sys.stderr)
- print(
- precopy.replace("\n", "\n ", precopy.count("\n") - 1), file=sys.stderr
- )
- CommandRunner.run_or_fail(
- ["bash", "-c", precopy], cwd=repo_dir, stage="Precopy"
- )
- # Copy files from upstream repo
- print(f"Copying files from {repo_dir} to {path}", file=sys.stderr)
- shutil.copytree(
- repo_dir,
- path,
- dirs_exist_ok=True,
- ignore=shutil.ignore_patterns(*GLOBAL_IGNORE),
- )
- # Run postcopy on our repository
- if postcopy is not None:
- print("Running postcopy script:", end="\n ", file=sys.stderr)
- print(
- postcopy.replace("\n", "\n ", postcopy.count("\n") - 1),
- file=sys.stderr,
- )
- CommandRunner.run_or_fail(
- ["bash", "-c", postcopy], cwd=path, stage="Postcopy"
- )
- class Git:
- default_branch = "master"
- @staticmethod
- def clone(remote_url: str, branch: str, repo_dir: str, reclone=False):
- # If repo needs to be fresh
- if reclone and os.path.exists(repo_dir):
- shutil.rmtree(repo_dir)
- # Clone repo in tmp directory and checkout branch
- if not os.path.exists(repo_dir):
- print(
- f"Cloning {remote_url} to {repo_dir} and checking out {branch}",
- file=sys.stderr,
- )
- CommandRunner.run_or_fail(
- ["git", "clone", "--depth=1", "-b", branch, remote_url, repo_dir],
- stage="Clone",
- )
- @staticmethod
- def checkout_or_create_branch(branch_name: str):
- # Get current branch name
- result = CommandRunner.run_or_fail(
- ["git", "rev-parse", "--abbrev-ref", "HEAD"], stage="GetDefaultBranch"
- )
- Git.default_branch = result.stdout.decode("utf-8").strip()
- # Create new branch and return created branch name
- try:
- # try to checkout already existing branch
- CommandRunner.run_or_fail(
- ["git", "checkout", branch_name], stage="CreateBranch"
- )
- except CommandRunner.Exception:
- # otherwise create new branch
- CommandRunner.run_or_fail(
- ["git", "checkout", "-b", branch_name], stage="CreateBranch"
- )
- return branch_name
- @staticmethod
- def add_and_commit(scope: str, version: str) -> bool:
- """
- Returns `True` if there were changes and were indeed commited.
- Returns `False` if the repo was clean and no changes were commited.
- """
- # check if repo is clean (clean => no error, no commit)
- try:
- CommandRunner.run_or_fail(
- ["git", "diff", "--exit-code"], stage="CheckRepoClean"
- )
- return False
- except CommandRunner.Exception:
- # if it's other kind of error just throw!
- pass
- user_name = os.environ.get("GIT_APP_NAME")
- user_email = os.environ.get("GIT_APP_EMAIL")
- # Add all files to git staging
- CommandRunner.run_or_fail(["git", "add", "-A", "-v"], stage="AddFiles")
- # Reset environment and git config
- clean_env = os.environ.copy()
- clean_env["LANG"] = "C.UTF-8"
- clean_env["GIT_CONFIG_GLOBAL"] = "/dev/null"
- clean_env["GIT_CONFIG_NOSYSTEM"] = "1"
- # Commit with settings above
- CommandRunner.run_or_fail(
- [
- "git",
- "-c",
- f"user.name={user_name}",
- "-c",
- f"user.email={user_email}",
- "commit",
- "-m",
- f"feat({scope}): update to {version}",
- ],
- stage="CreateCommit",
- env=clean_env,
- )
- return True
- @staticmethod
- def push(branch: str):
- CommandRunner.run_or_fail(
- ["git", "push", "-u", "origin", branch], stage="PushBranch"
- )
- @staticmethod
- def clean_repo():
- CommandRunner.run_or_fail(
- ["git", "reset", "--hard", "HEAD"], stage="ResetRepository"
- )
- CommandRunner.run_or_fail(
- ["git", "checkout", Git.default_branch], stage="CheckoutDefaultBranch"
- )
- class GitHub:
- @staticmethod
- def check_newer_tag(repo, current_tag) -> UpdateStatusFalse | UpdateStatusTrue:
- # GET /repos/:owner/:repo/git/refs/tags
- url = f"https://api.github.com/repos/{repo}/git/refs/tags"
- # Send a GET request to the GitHub API
- response = requests.get(url)
- current_version = coerce(current_tag)
- if current_version is None:
- raise ValueError(
- f"Stored {current_version} from {repo} does not follow semver"
- )
- # If the request was successful
- if response.status_code == 200:
- # Parse the JSON response
- data = response.json()
- if len(data) == 0:
- return {
- "has_updates": False,
- }
- latest_ref = None
- latest_version: Optional[Version] = None
- for ref in data:
- # we find the tag since GitHub returns it as plain git ref
- tag_version = coerce(ref["ref"].replace("refs/tags/", ""))
- if tag_version is None:
- # we skip every tag that is not semver-complaint
- continue
- if latest_version is None or tag_version.compare(latest_version) > 0:
- # if we have a "greater" semver version, set it as latest
- latest_version = tag_version
- latest_ref = ref
- # raise if no valid semver tag is found
- if latest_ref is None or latest_version is None:
- raise ValueError(f"No tags following semver found in {repo}")
- # we get the tag since GitHub returns it as plain git ref
- latest_tag = latest_ref["ref"].replace("refs/tags/", "")
- if latest_version.compare(current_version) <= 0:
- return {
- "has_updates": False,
- }
- return {
- "has_updates": True,
- "version": latest_tag,
- "compare_url": f"https://github.com/{repo}/compare/{current_tag}...{latest_tag}",
- "head_ref": latest_ref["object"]["sha"],
- "head_url": f"https://github.com/{repo}/releases/tag/{latest_tag}",
- }
- else:
- # If the request was not successful, raise an exception
- raise Exception(
- f"GitHub API request failed with status code {response.status_code}: {response.json()}"
- )
- @staticmethod
- def check_updates(repo, branch, version) -> UpdateStatusFalse | UpdateStatusTrue:
- url = f"https://api.github.com/repos/{repo}/compare/{version}...{branch}"
- # Send a GET request to the GitHub API
- response = requests.get(url)
- # If the request was successful
- if response.status_code == 200:
- # Parse the JSON response
- data = response.json()
- # If the base is behind the head, there is a newer version
- has_updates = data["status"] != "identical"
- if not has_updates:
- return {
- "has_updates": False,
- }
- return {
- "has_updates": data["status"] != "identical",
- "version": data["commits"][-1]["sha"],
- "compare_url": data["permalink_url"],
- "head_ref": data["commits"][-1]["sha"],
- "head_url": data["commits"][-1]["html_url"],
- }
- else:
- # If the request was not successful, raise an exception
- raise Exception(
- f"GitHub API request failed with status code {response.status_code}: {response.json()}"
- )
- @staticmethod
- def create_issue(title: str, body: str) -> None:
- cmd = ["gh", "issue", "create", "-t", title, "-b", body]
- CommandRunner.run_or_fail(cmd, stage="CreateIssue")
- @staticmethod
- def create_pr(branch: str, title: str, body: str) -> None:
- # first of all let's check if PR is already open
- check_cmd = [
- "gh",
- "pr",
- "list",
- "--state",
- "open",
- "--head",
- branch,
- "--json",
- "title",
- ]
- # returncode is 0 also if no PRs are found
- output = json.loads(
- CommandRunner.run_or_fail(check_cmd, stage="CheckPullRequestOpen")
- .stdout.decode("utf-8")
- .strip()
- )
- # we have PR in this case!
- if len(output) > 0:
- return
- cmd = [
- "gh",
- "pr",
- "create",
- "-B",
- Git.default_branch,
- "-H",
- branch,
- "-t",
- title,
- "-b",
- body,
- ]
- CommandRunner.run_or_fail(cmd, stage="CreatePullRequest")
- def main():
- # Load the YAML file
- with open(DEPS_YAML_FILE, "r") as yaml_file:
- data: DependencyYAML = yaml.safe_load(yaml_file)
- if "dependencies" not in data:
- raise Exception("dependencies.yml not properly formatted")
- # Cache YAML version
- DependencyStore.set(data)
- dependencies = data["dependencies"]
- for path in dependencies:
- dependency = Dependency(path, dependencies[path])
- dependency.update_or_notify()
- if __name__ == "__main__":
- main()
|