This also includes extensive tests for each of the currently supported
customizations. It should be generic enough to continue working as long
as the list of dicts includes a 'name' or 'user' field in the dict.
Otherwise support for a new dict key will need to be added to the
customizations_diff function.
(cherry picked from commit 850c490b6e
Related: rhbz#1718473
1060 lines
35 KiB
1060 lines
35 KiB
# Copyright (C) 2017-2019 Red Hat, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <>.
import gi
gi.require_version("Ggit", "1.0")
from gi.repository import Ggit as Git
from gi.repository import Gio
from gi.repository import GLib
import os
import pytoml as toml
import semantic_version as semver
from pylorax.api.projects import dep_evra
from pylorax.base import DataHolder
from pylorax.sysutils import joinpaths
class CommitTimeValError(Exception):
class RecipeFileError(Exception):
class RecipeError(Exception):
class Recipe(dict):
"""A Recipe of package and modules
This is a subclass of dict that enforces the constructor arguments
and adds a .filename property to return the recipe's filename,
and a .toml() function to return the recipe as a TOML string.
def __init__(self, name, description, version, modules, packages, groups, customizations=None):
# Check that version is empty or semver compatible
if version:
# Make sure modules, packages, and groups are listed by their case-insensitive names
if modules is not None:
modules = sorted(modules, key=lambda m: m["name"].lower())
if packages is not None:
packages = sorted(packages, key=lambda p: p["name"].lower())
if groups is not None:
groups = sorted(groups, key=lambda g: g["name"].lower())
dict.__init__(self, name=name,
# We don't want customizations=None to show up in the TOML so remove it
if customizations is None:
del self["customizations"]
def package_names(self):
"""Return the names of the packages"""
return map(lambda p: p["name"], self["packages"] or [])
def package_nver(self):
"""Return the names and versions of the packages"""
return [(p["name"], p["version"]) for p in self["packages"] or []]
def module_names(self):
"""Return the names of the modules"""
return map(lambda m: m["name"], self["modules"] or [])
def module_nver(self):
"""Return the names and versions of the modules"""
return [(m["name"], m["version"]) for m in self["modules"] or []]
def group_names(self):
"""Return the names of the groups. Groups do not have versions."""
return map(lambda g: g["name"], self["groups"] or [])
def filename(self):
"""Return the Recipe's filename
Replaces spaces in the name with '-' and appends .toml
return recipe_filename(self.get("name"))
def toml(self):
"""Return the Recipe in TOML format"""
return toml.dumps(self).encode("UTF-8")
def bump_version(self, old_version=None):
"""semver recipe version number bump
:param old_version: An optional old version number
:type old_version: str
:returns: The new version number or None
:rtype: str
:raises: ValueError
If neither have a version, 0.0.1 is returned
If there is no old version the new version is checked and returned
If there is no new version, but there is a old one, bump its patch level
If the old and new versions are the same, bump the patch level
If they are different, check and return the new version
new_version = self.get("version")
if not new_version and not old_version:
self["version"] = "0.0.1"
elif new_version and not old_version:
self["version"] = new_version
elif not new_version or new_version == old_version:
new_version = str(semver.Version(old_version).next_patch())
self["version"] = new_version
self["version"] = new_version
# Return the new version
return str(semver.Version(self["version"]))
def freeze(self, deps):
""" Return a new Recipe with full module and package NEVRA
:param deps: A list of dependency NEVRA to use to fill in the modules and packages
:type deps: list(
:returns: A new Recipe object
:rtype: Recipe
module_names = self.module_names
package_names = self.package_names
group_names = self.group_names
new_modules = []
new_packages = []
new_groups = []
for dep in deps:
if dep["name"] in package_names:
new_packages.append(RecipePackage(dep["name"], dep_evra(dep)))
elif dep["name"] in module_names:
new_modules.append(RecipeModule(dep["name"], dep_evra(dep)))
elif dep["name"] in group_names:
if "customizations" in self:
customizations = self["customizations"]
customizations = None
return Recipe(self["name"], self["description"], self["version"],
new_modules, new_packages, new_groups, customizations)
class RecipeModule(dict):
def __init__(self, name, version):
dict.__init__(self, name=name, version=version)
class RecipePackage(RecipeModule):
class RecipeGroup(dict):
def __init__(self, name):
dict.__init__(self, name=name)
def recipe_from_file(recipe_path):
"""Return a recipe file as a Recipe object
:param recipe_path: Path to the recipe fila
:type recipe_path: str
:returns: A Recipe object
:rtype: Recipe
with open(recipe_path, 'rb') as f:
return recipe_from_toml(
def recipe_from_toml(recipe_str):
"""Create a Recipe object from a toml string.
:param recipe_str: The Recipe TOML string
:type recipe_str: str
:returns: A Recipe object
:rtype: Recipe
:raises: TomlError
recipe_dict = toml.loads(recipe_str)
return recipe_from_dict(recipe_dict)
def recipe_from_dict(recipe_dict):
"""Create a Recipe object from a plain dict.
:param recipe_dict: A plain dict of the recipe
:type recipe_dict: dict
:returns: A Recipe object
:rtype: Recipe
:raises: RecipeError
# Make RecipeModule objects from the toml
# The TOML may not have modules or packages in it. Set them to None in this case
if recipe_dict.get("modules"):
modules = [RecipeModule(m.get("name"), m.get("version")) for m in recipe_dict["modules"]]
modules = []
if recipe_dict.get("packages"):
packages = [RecipePackage(p.get("name"), p.get("version")) for p in recipe_dict["packages"]]
packages = []
if recipe_dict.get("groups"):
groups = [RecipeGroup(g.get("name")) for g in recipe_dict["groups"]]
groups = []
name = recipe_dict["name"]
description = recipe_dict["description"]
version = recipe_dict.get("version", None)
customizations = recipe_dict.get("customizations", None)
# [customizations] was incorrectly documented at first, so we have to support using it
# as [[customizations]] by grabbing the first element.
if isinstance(customizations, list):
customizations = customizations[0]
except KeyError as e:
raise RecipeError("There was a problem parsing the recipe: %s" % str(e))
return Recipe(name, description, version, modules, packages, groups, customizations)
def gfile(path):
"""Convert a string path to GFile for use with Git"""
return Gio.file_new_for_path(path)
def recipe_filename(name):
"""Return the toml filename for a recipe
Replaces spaces with '-' and appends '.toml'
# XXX Raise and error if this is empty?
return name.replace(" ", "-") + ".toml"
def head_commit(repo, branch):
"""Get the branch's HEAD Commit Object
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:returns: Branch's head commit
:rtype: Git.Commit
:raises: Can raise errors from Ggit
branch_obj = repo.lookup_branch(branch, Git.BranchType.LOCAL)
commit_id = branch_obj.get_target()
return repo.lookup(commit_id, Git.Commit)
def prepare_commit(repo, branch, builder):
"""Prepare for a commit
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param builder: instance of TreeBuilder
:type builder: TreeBuilder
:returns: (Tree, Sig, Ref)
:rtype: tuple
:raises: Can raise errors from Ggit
tree_id = builder.write()
tree = repo.lookup(tree_id, Git.Tree)
sig = Git.Signature.new_now("bdcs-api-server", "user-email")
ref = "refs/heads/%s" % branch
return (tree, sig, ref)
def open_or_create_repo(path):
"""Open an existing repo, or create a new one
:param path: path to recipe directory
:type path: string
:returns: A repository object
:rtype: Git.Repository
:raises: Can raise errors from Ggit
A bare git repo will be created in the git directory of the specified path.
If a repo already exists it will be opened and returned instead of
creating a new one.
git_path = joinpaths(path, "git")
if os.path.exists(joinpaths(git_path, "HEAD")):
repo = Git.Repository.init_repository(gfile(git_path), True)
# Make an initial empty commit
sig = Git.Signature.new_now("bdcs-api-server", "user-email")
tree_id = repo.get_index().write_tree()
tree = repo.lookup(tree_id, Git.Tree)
repo.create_commit("HEAD", sig, sig, "UTF-8", "Initial Recipe repository commit", tree, [])
return repo
def write_commit(repo, branch, filename, message, content):
"""Make a new commit to a repository's branch
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: full path of the file to add
:type filename: str
:param message: The commit message
:type message: str
:param content: The data to write
:type content: str
:returns: OId of the new commit
:rtype: Git.OId
:raises: Can raise errors from Ggit
parent_commit = head_commit(repo, branch)
except GLib.GError:
# Branch doesn't exist, make a new one based on master
master_head = head_commit(repo, "master")
repo.create_branch(branch, master_head, 0)
parent_commit = head_commit(repo, branch)
parent_commit = head_commit(repo, branch)
blob_id = repo.create_blob_from_buffer(content)
# Use treebuilder to make a new entry for this filename and blob
parent_tree = parent_commit.get_tree()
builder = repo.create_tree_builder_from_tree(parent_tree)
builder.insert(filename, blob_id, Git.FileMode.BLOB)
(tree, sig, ref) = prepare_commit(repo, branch, builder)
return repo.create_commit(ref, sig, sig, "UTF-8", message, tree, [parent_commit])
def read_commit_spec(repo, spec):
"""Return the raw content of the blob specified by the spec
:param repo: Open repository
:type repo: Git.Repository
:param spec: Git revparse spec
:type spec: str
:returns: Contents of the commit
:rtype: str
:raises: Can raise errors from Ggit
eg. To read the README file from master the spec is "master:README"
commit_id = repo.revparse(spec).get_id()
blob = repo.lookup(commit_id, Git.Blob)
return blob.get_raw_content()
def read_commit(repo, branch, filename, commit=None):
"""Return the contents of a file on a specific branch or commit.
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: filename to read
:type filename: str
:param commit: Optional commit hash
:type commit: str
:returns: The commit id, and the contents of the commit
:rtype: tuple(str, str)
:raises: Can raise errors from Ggit
If no commit is passed the master:filename is returned, otherwise it will be
if not commit:
# Find the most recent commit for filename on the selected branch
commits = list_commits(repo, branch, filename, 1)
if not commits:
raise RecipeError("No commits for %s on the %s branch." % (filename, branch))
commit = commits[0].commit
return (commit, read_commit_spec(repo, "%s:%s" % (commit, filename)))
def read_recipe_commit(repo, branch, recipe_name, commit=None):
"""Read a recipe commit from git and return a Recipe object
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param recipe_name: Recipe name to read
:type recipe_name: str
:param commit: Optional commit hash
:type commit: str
:returns: A Recipe object
:rtype: Recipe
:raises: Can raise errors from Ggit
If no commit is passed the master:filename is returned, otherwise it will be
if not repo_file_exists(repo, branch, recipe_filename(recipe_name)):
raise RecipeFileError("Unknown blueprint")
(_, recipe_toml) = read_commit(repo, branch, recipe_filename(recipe_name), commit)
return recipe_from_toml(recipe_toml)
def read_recipe_and_id(repo, branch, recipe_name, commit=None):
"""Read a recipe commit and its id from git
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param recipe_name: Recipe name to read
:type recipe_name: str
:param commit: Optional commit hash
:type commit: str
:returns: The commit id, and a Recipe object
:rtype: tuple(str, Recipe)
:raises: Can raise errors from Ggit
If no commit is passed the master:filename is returned, otherwise it will be
(commit_id, recipe_toml) = read_commit(repo, branch, recipe_filename(recipe_name), commit)
return (commit_id, recipe_from_toml(recipe_toml))
def list_branch_files(repo, branch):
"""Return a sorted list of the files on the branch HEAD
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:returns: A sorted list of the filenames
:rtype: list(str)
:raises: Can raise errors from Ggit
commit = head_commit(repo, branch).get_id().to_string()
return list_commit_files(repo, commit)
def list_commit_files(repo, commit):
"""Return a sorted list of the files on a commit
:param repo: Open repository
:type repo: Git.Repository
:param commit: The commit hash to list
:type commit: str
:returns: A sorted list of the filenames
:rtype: list(str)
:raises: Can raise errors from Ggit
commit_id = Git.OId.new_from_string(commit)
commit_obj = repo.lookup(commit_id, Git.Commit)
tree = commit_obj.get_tree()
return sorted([tree.get(i).get_name() for i in range(0,tree.size())])
def delete_recipe(repo, branch, recipe_name):
"""Delete a recipe from a branch.
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param recipe_name: Recipe name to delete
:type recipe_name: str
:returns: OId of the new commit
:rtype: Git.OId
:raises: Can raise errors from Ggit
return delete_file(repo, branch, recipe_filename(recipe_name))
def delete_file(repo, branch, filename):
"""Delete a file from a branch.
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: filename to delete
:type filename: str
:returns: OId of the new commit
:rtype: Git.OId
:raises: Can raise errors from Ggit
parent_commit = head_commit(repo, branch)
parent_tree = parent_commit.get_tree()
builder = repo.create_tree_builder_from_tree(parent_tree)
(tree, sig, ref) = prepare_commit(repo, branch, builder)
message = "Recipe %s deleted" % filename
return repo.create_commit(ref, sig, sig, "UTF-8", message, tree, [parent_commit])
def revert_recipe(repo, branch, recipe_name, commit):
"""Revert the contents of a recipe to that of a previous commit
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param recipe_name: Recipe name to revert
:type recipe_name: str
:param commit: Commit hash
:type commit: str
:returns: OId of the new commit
:rtype: Git.OId
:raises: Can raise errors from Ggit
return revert_file(repo, branch, recipe_filename(recipe_name), commit)
def revert_file(repo, branch, filename, commit):
"""Revert the contents of a file to that of a previous commit
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: filename to revert
:type filename: str
:param commit: Commit hash
:type commit: str
:returns: OId of the new commit
:rtype: Git.OId
:raises: Can raise errors from Ggit
commit_id = Git.OId.new_from_string(commit)
commit_obj = repo.lookup(commit_id, Git.Commit)
revert_tree = commit_obj.get_tree()
entry = revert_tree.get_by_name(filename)
blob_id = entry.get_id()
parent_commit = head_commit(repo, branch)
# Use treebuilder to modify the tree
parent_tree = parent_commit.get_tree()
builder = repo.create_tree_builder_from_tree(parent_tree)
builder.insert(filename, blob_id, Git.FileMode.BLOB)
(tree, sig, ref) = prepare_commit(repo, branch, builder)
commit_hash = commit_id.to_string()
message = "%s reverted to commit %s" % (filename, commit_hash)
return repo.create_commit(ref, sig, sig, "UTF-8", message, tree, [parent_commit])
def commit_recipe(repo, branch, recipe):
"""Commit a recipe to a branch
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param recipe: Recipe to commit
:type recipe: Recipe
:returns: OId of the new commit
:rtype: Git.OId
:raises: Can raise errors from Ggit
old_recipe = read_recipe_commit(repo, branch, recipe["name"])
old_version = old_recipe["version"]
except Exception:
old_version = None
recipe_toml = recipe.toml()
message = "Recipe %s, version %s saved." % (recipe["name"], recipe["version"])
return write_commit(repo, branch, recipe.filename, message, recipe_toml)
def commit_recipe_file(repo, branch, filename):
"""Commit a recipe file to a branch
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: Path to the recipe file to commit
:type filename: str
:returns: OId of the new commit
:rtype: Git.OId
:raises: Can raise errors from Ggit or RecipeFileError
recipe = recipe_from_file(filename)
except IOError:
raise RecipeFileError
return commit_recipe(repo, branch, recipe)
def commit_recipe_directory(repo, branch, directory):
r"""Commit all \*.toml files from a directory, if they aren't already in git.
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param directory: The directory of \*.toml recipes to commit
:type directory: str
:returns: None
:raises: Can raise errors from Ggit or RecipeFileError
Files with Toml or RecipeFileErrors will be skipped, and the remainder will
be tried.
dir_files = set([e for e in os.listdir(directory) if e.endswith(".toml")])
branch_files = set(list_branch_files(repo, branch))
new_files = dir_files.difference(branch_files)
for f in new_files:
# Skip files with errors, but try the others
commit_recipe_file(repo, branch, joinpaths(directory, f))
except (RecipeFileError, toml.TomlError):
def tag_recipe_commit(repo, branch, recipe_name):
"""Tag a file's most recent commit
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param recipe_name: Recipe name to tag
:type recipe_name: str
:returns: Tag id or None if it failed.
:rtype: Git.OId
:raises: Can raise errors from Ggit
Uses tag_file_commit()
if not repo_file_exists(repo, branch, recipe_filename(recipe_name)):
raise RecipeFileError("Unknown blueprint")
return tag_file_commit(repo, branch, recipe_filename(recipe_name))
def tag_file_commit(repo, branch, filename):
"""Tag a file's most recent commit
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: Filename to tag
:type filename: str
:returns: Tag id or None if it failed.
:rtype: Git.OId
:raises: Can raise errors from Ggit
This uses git tags, of the form `refs/tags/<branch>/<filename>/r<revision>`
Only the most recent recipe commit can be tagged to prevent out of order tagging.
Revisions start at 1 and increment for each new commit that is tagged.
If the commit has already been tagged it will return false.
file_commits = list_commits(repo, branch, filename)
if not file_commits:
return None
# Find the most recently tagged version (may not be one) and add 1 to it.
for details in file_commits:
if details.revision is not None:
new_revision = details.revision + 1
new_revision = 1
name = "%s/%s/r%d" % (branch, filename, new_revision)
sig = Git.Signature.new_now("bdcs-api-server", "user-email")
commit_id = Git.OId.new_from_string(file_commits[0].commit)
commit = repo.lookup(commit_id, Git.Commit)
return repo.create_tag(name, commit, sig, name, Git.CreateFlags.NONE)
def find_commit_tag(repo, branch, filename, commit_id):
"""Find the tag that matches the commit_id
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: filename to revert
:type filename: str
:param commit_id: The commit id to check
:type commit_id: Git.OId
:returns: The tag or None if there isn't one
:rtype: str or None
There should be only 1 tag pointing to a commit, but there may not
be a tag at all.
The tag will look like: 'refs/tags/<branch>/<filename>/r<revision>'
pattern = "%s/%s/r*" % (branch, filename)
tags = [t for t in repo.list_tags_match(pattern) if is_commit_tag(repo, commit_id, t)]
if len(tags) != 1:
return None
return tags[0]
def is_commit_tag(repo, commit_id, tag):
"""Check to see if a tag points to a specific commit.
:param repo: Open repository
:type repo: Git.Repository
:param commit_id: The commit id to check
:type commit_id: Git.OId
:param tag: The tag to check
:type tag: str
:returns: True if the tag points to the commit, False otherwise
:rtype: bool
ref = repo.lookup_reference("refs/tags/" + tag)
tag_id = ref.get_target()
tag = repo.lookup(tag_id, Git.Tag)
target_id = tag.get_target_id()
return == 0
def get_revision_from_tag(tag):
"""Return the revision number from a tag
:param tag: The tag to exract the revision from
:type tag: str
:returns: The integer revision or None
:rtype: int or None
The revision is the part after the r in 'branch/filename/rXXX'
if tag is None:
return None
return int(tag.rsplit('r',2)[-1])
except (ValueError, IndexError):
return None
class CommitDetails(DataHolder):
def __init__(self, commit, timestamp, message, revision=None):
commit = commit,
timestamp = timestamp,
message = message,
revision = revision)
def list_commits(repo, branch, filename, limit=0):
"""List the commit history of a file on a branch.
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: filename to revert
:type filename: str
:param limit: Number of commits to return (0=all)
:type limit: int
:returns: A list of commit details
:rtype: list(CommitDetails)
:raises: Can raise errors from Ggit
revwalk =
branch_ref = "refs/heads/%s" % branch
commits = []
while True:
commit_id =
if not commit_id:
commit = repo.lookup(commit_id, Git.Commit)
parents = commit.get_parents()
# No parents? Must be the first commit.
if parents.get_size() == 0:
tree = commit.get_tree()
# Is the filename in this tree? If not, move on.
if not tree.get_by_name(filename):
# Is filename different in all of the parent commits?
parent_commits = map(parents.get, xrange(0, parents.get_size()))
is_diff = all(map(lambda pc: is_parent_diff(repo, filename, tree, pc), parent_commits))
# No changes from parents, skip it.
if not is_diff:
tag = find_commit_tag(repo, branch, filename, commit.get_id())
commits.append(get_commit_details(commit, get_revision_from_tag(tag)))
if limit and len(commits) > limit:
except CommitTimeValError:
# Skip any commits that have trouble converting the time
# TODO - log details about this failure
# These will be in reverse time sort order thanks to revwalk
return commits
def get_commit_details(commit, revision=None):
"""Return the details about a specific commit.
:param commit: The commit to get details from
:type commit: Git.Commit
:param revision: Optional commit revision
:type revision: int
:returns: Details about the commit
:rtype: CommitDetails
:raises: CommitTimeValError or Ggit exceptions
message = commit.get_message()
commit_str = commit.get_id().to_string()
sig = commit.get_committer()
datetime = sig.get_time()
# XXX What do we do with timezone?
_timezone = sig.get_time_zone()
timeval = GLib.TimeVal()
ok = datetime.to_timeval(timeval)
if not ok:
raise CommitTimeValError
time_str = timeval.to_iso8601()
return CommitDetails(commit_str, time_str, message, revision)
def is_parent_diff(repo, filename, tree, parent):
"""Check to see if the commit is different from its parents
:param repo: Open repository
:type repo: Git.Repository
:param filename: filename to revert
:type filename: str
:param tree: The commit's tree
:type tree: Git.Tree
:param parent: The commit's parent commit
:type parent: Git.Commit
:retuns: True if filename in the commit is different from its parents
:rtype: bool
diff_opts =
diff = Git.Diff.new_tree_to_tree(repo, parent.get_tree(), tree, diff_opts)
return diff.get_num_deltas() > 0
def find_field_value(field, value, lst):
"""Find a field matching value in the list of dicts.
:param field: field to search for
:type field: str
:param value: value to match in the field
:type value: str
:param lst: List of dict's with field
:type lst: list of dict
:returns: First dict with matching field:value, or None
:rtype: dict or None
Used to return a specific entry from a list that looks like this:
[{"name": "one", "attr": "green"}, ...]
find_field_value("name", "one", lst) will return the matching dict.
for d in lst:
if d.get(field) and d.get(field) == value:
return d
return None
def find_name(name, lst):
"""Find the dict matching the name in a list and return it.
:param name: Name to search for
:type name: str
:param lst: List of dict's with "name" field
:type lst: list of dict
:returns: First dict with matching name, or None
:rtype: dict or None
This is just a wrapper for find_field_value with field set to "name"
return find_field_value("name", name, lst)
def find_recipe_obj(path, recipe, default=None):
"""Find a recipe object
:param path: A list of dict field names
:type path: list of str
:param recipe: The recipe to search
:type recipe: Recipe
:param default: The value to return if it is not found
:type default: Any
Return the object found by applying the path to the dicts in the recipe, or
return the default if it doesn't exist.
eg. {"customizations": {"hostname": "foo", "users": [...]}}
find_recipe_obj(["customizations", "hostname"], recipe, "")
o = recipe
for p in path:
if not o.get(p):
return default
o = o.get(p)
except AttributeError:
return default
return o
def diff_lists(title, field, old_items, new_items):
"""Return the differences between two lists of dicts.
:param title: Title of the entry
:type title: str
:param field: Field to use as the key for comparisons
:type field: str
:param old_items: List of item dicts with "name" field
:type old_items: list(dict)
:param new_items: List of item dicts with "name" field
:type new_items: list(dict)
:returns: List of diff dicts with old/new entries
:rtype: list(dict)
diffs = []
old_fields= set(m[field] for m in old_items)
new_fields= set(m[field] for m in new_items)
added_items = new_fields.difference(old_fields)
added_items = sorted(added_items, key=lambda n: n.lower())
removed_items = old_fields.difference(new_fields)
removed_items = sorted(removed_items, key=lambda n: n.lower())
same_items = old_fields.intersection(new_fields)
same_items = sorted(same_items, key=lambda n: n.lower())
for v in added_items:
"new":{title:find_field_value(field, v, new_items)}})
for v in removed_items:
diffs.append({"old":{title:find_field_value(field, v, old_items)},
for v in same_items:
old_item = find_field_value(field, v, old_items)
new_item = find_field_value(field, v, new_items)
if old_item != new_item:
return diffs
def customizations_diff(old_recipe, new_recipe):
"""Diff the customizations sections from two versions of a recipe
diffs = []
old_keys = set(old_recipe.get("customizations", {}).keys())
new_keys = set(new_recipe.get("customizations", {}).keys())
added_keys = new_keys.difference(old_keys)
added_keys = sorted(added_keys, key=lambda n: n.lower())
removed_keys = old_keys.difference(new_keys)
removed_keys = sorted(removed_keys, key=lambda n: n.lower())
same_keys = old_keys.intersection(new_keys)
same_keys = sorted(same_keys, key=lambda n: n.lower())
for v in added_keys:
diffs.append({"old": None,
"new": {"Customizations."+v: new_recipe["customizations"][v]}})
for v in removed_keys:
diffs.append({"old": {"Customizations."+v: old_recipe["customizations"][v]},
"new": None})
for v in same_keys:
if new_recipe["customizations"][v] == old_recipe["customizations"][v]:
if type(new_recipe["customizations"][v]) == type([]):
# Lists of dicts need to use diff_lists
# sshkey uses 'user', user and group use 'name'
if "user" in new_recipe["customizations"][v][0]:
field_name = "user"
elif "name" in new_recipe["customizations"][v][0]:
field_name = "name"
raise RuntimeError("%s list has unrecognized key, not 'name' or 'user'" % "customizations."+v)
diffs.extend(diff_lists("Customizations."+v, field_name, old_recipe["customizations"][v], new_recipe["customizations"][v]))
diffs.append({"old": {"Customizations."+v: old_recipe["customizations"][v]},
"new": {"Customizations."+v: new_recipe["customizations"][v]}})
return diffs
def recipe_diff(old_recipe, new_recipe):
"""Diff two versions of a recipe
:param old_recipe: The old version of the recipe
:type old_recipe: Recipe
:param new_recipe: The new version of the recipe
:type new_recipe: Recipe
:returns: A list of diff dict entries with old/new
:rtype: list(dict)
diffs = []
# These cannot be added or removed, just different
for element in ["name", "description", "version"]:
if old_recipe[element] != new_recipe[element]:
# These lists always exist
diffs.extend(diff_lists("Module", "name", old_recipe["modules"], new_recipe["modules"]))
diffs.extend(diff_lists("Package", "name", old_recipe["packages"], new_recipe["packages"]))
diffs.extend(diff_lists("Group", "name", old_recipe["groups"], new_recipe["groups"]))
# The customizations section can contain a number of different types
diffs.extend(customizations_diff(old_recipe, new_recipe))
# repos contains keys that are lists (eg. [[repos.git]])
diffs.extend(diff_lists("Repos.git", "rpmname",
find_recipe_obj(["repos", "git"], old_recipe, []),
find_recipe_obj(["repos", "git"], new_recipe, [])))
return diffs
def repo_file_exists(repo, branch, filename):
"""Return True if the filename exists on the branch
:param repo: Open repository
:type repo: Git.Repository
:param branch: Branch name
:type branch: str
:param filename: Filename to check
:type filename: str
:returns: True if the filename exists on the HEAD of the branch, False otherwise.
:rtype: bool
commit = head_commit(repo, branch).get_id().to_string()
commit_id = Git.OId.new_from_string(commit)
commit_obj = repo.lookup(commit_id, Git.Commit)
tree = commit_obj.get_tree()
return tree.get_by_name(filename) is not None