Source code for boom

# Copyright (C) 2017 Red Hat, Inc., Bryn M. Reeves <bmr@redhat.com>
#
# boom/__init__.py - Boom package initialisation
#
# This file is part of the boom project.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""This module provides classes and functions for creating, displaying,
and manipulating boot loader entries complying with the Boot Loader
Specification.

The ``boom`` package contains global definitions, functions to configure
the Boom environment, logging infrastructure for the package and a
``Selection`` class used to select one or more ``OsProfile``,
``BootEntry``, or ``BootParams`` object according to specified selection
criteria.

Individual sub-modules provide interfaces to the various components of
Boom: operating system profiles, boot loader entries and boot
parameters, the boom CLI and procedural API and a simple reporting
module to produce tabular reports on Boom objects.

See the sub-module documentation for specific information on the
classes and interfaces provided, and the ``boom`` tool help output and
manual page for information on using the command line interface.
"""
from os.path import exists as path_exists, isabs, join as path_join
import logging

__version__ = "0.9"

#: The location of the system ``/boot`` directory.
DEFAULT_BOOT_PATH = "/boot"

#: The default path for Boom configuration files.
DEFAULT_BOOM_DIR = "boom"

#: The root directory for Boom configuration files.
DEFAULT_BOOM_PATH = path_join(DEFAULT_BOOT_PATH, DEFAULT_BOOM_DIR)

#: Kernel version string, in ``uname -r`` format.
FMT_VERSION = "version"
#: LVM2 root logical volume in ``vg/lv`` format.
FMT_LVM_ROOT_LV = "lvm_root_lv"
#: LVM2 kernel command line options
FMT_LVM_ROOT_OPTS = "lvm_root_opts"
#: BTRFS subvolume specification.
FMT_BTRFS_SUBVOLUME = "btrfs_subvolume"
#: BTRFS subvolume ID specification.
FMT_BTRFS_SUBVOL_ID = "btrfs_subvol_id"
#: BTRFS subvolume path specification.
FMT_BTRFS_SUBVOL_PATH = "btrfs_subvol_path"
#: BTRFS kernel command line options
FMT_BTRFS_ROOT_OPTS = "btrfs_root_opts"
#: Root device path.
FMT_ROOT_DEVICE = "root_device"
#: Root device options.
FMT_ROOT_OPTS = "root_opts"
#: Linux kernel image
FMT_KERNEL = "kernel"
#: Initramfs image
FMT_INITRAMFS = "initramfs"

#: List of all possible format keys.
FORMAT_KEYS = [
    FMT_VERSION,
    FMT_LVM_ROOT_LV, FMT_LVM_ROOT_OPTS,
    FMT_BTRFS_SUBVOL_ID, FMT_BTRFS_SUBVOL_PATH,
    FMT_BTRFS_SUBVOLUME, FMT_BTRFS_ROOT_OPTS,
    FMT_ROOT_DEVICE, FMT_ROOT_OPTS,
    FMT_KERNEL, FMT_INITRAMFS
]

_MACHINE_ID = "/etc/machine-id"
_DBUS_MACHINE_ID = "/var/lib/dbus/machine-id"

BOOM_LOG_DEBUG = logging.DEBUG
BOOM_LOG_INFO = logging.INFO
BOOM_LOG_WARN = logging.WARNING
BOOM_LOG_ERROR = logging.ERROR

_log_levels = (
    BOOM_LOG_DEBUG,
    BOOM_LOG_INFO,
    BOOM_LOG_WARN,
    BOOM_LOG_ERROR
)

_log = logging.getLogger(__name__)

_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error

# Boom debugging levels
BOOM_DEBUG_PROFILE = 1
BOOM_DEBUG_ENTRY = 2
BOOM_DEBUG_REPORT = 4
BOOM_DEBUG_COMMAND = 8
BOOM_DEBUG_ALL = (BOOM_DEBUG_PROFILE |
                  BOOM_DEBUG_ENTRY |
                  BOOM_DEBUG_REPORT |
                  BOOM_DEBUG_COMMAND)

__debug_mask = 0


[docs]class BoomError(Exception): """Base class of all Boom exceptions. """ pass
class BoomLogger(logging.Logger): """BoomLogger() Boom logging wrapper class: wrap the Logger.debug() method to allow filtering of submodule debug messages by log mask. This allows us to selectively control which messages are logged in the library without having to tamper with the Handler, Filter or Formatter configurations (which belong to the client application using the library). """ mask_bits = 0 def set_debug_mask(self, mask_bits): """Set the debug mask for this ``BoomLogger``. This should normally be set to the ``BOOM_DEBUG_*`` value corresponding to the ``boom`` sub-module that this instance of ``BoomLogger`` belongs to. :param mask_bits: The bits to set in this logger's mask. :returntype: None """ if mask_bits < 0 or mask_bits > BOOM_DEBUG_ALL: raise ValueError("Invalid BoomLogger mask bits: 0x%x" % (mask_bits & ~BOOM_DEBUG_ALL)) self.mask_bits = mask_bits def debug_masked(self, msg, *args, **kwargs): """Log a debug message if it passes the current debug mask. Log the specified message if it passes the current logger debug mask. :param msg: the message to be logged :returntype: None """ if self.mask_bits & get_debug_mask(): self.debug(msg, *args, **kwargs) logging.setLoggerClass(BoomLogger)
[docs]def get_debug_mask(): """Return the current debug mask for the ``boom`` package. :returns: The current debug mask value :returntype: int """ return __debug_mask
[docs]def set_debug_mask(mask): """Set the debug mask for the ``boom`` package. :param mask: the logical OR of the ``BOOM_DEBUG_*`` values to log. :returntype: None """ global __debug_mask if mask < 0 or mask > BOOM_DEBUG_ALL: raise ValueError("Invalid boom debug mask: %d" % mask) __debug_mask = mask
[docs]class BoomConfig(object): """Class representing boom persistent configuration values. """ # Initialise members from global defaults boot_path = DEFAULT_BOOT_PATH boom_path = DEFAULT_BOOM_PATH legacy_enable = False legacy_format = "grub1" legacy_sync = True
[docs] def __str__(self): """Return a string representation of this ``BoomConfig`` in boom.conf (INI) notation. """ cstr = "" cstr += '[defaults]\n' cstr += 'boot_path = "%s"\n' % self.boot_path cstr += 'boom_path = "%s"\n\n' % self.boom_path cstr += '[legacy]\n' cstr += 'enable = "%s"\n' % self.legacy_enable cstr += 'format = "%s"\n' % self.legacy_format cstr += 'sync = "%s"' % self.legacy_sync return cstr
[docs] def __repr__(self): """Return a string representation of this ``BoomConfig`` in BoomConfig initialiser notation. """ cstr = ('BoomConfig(boot_path="%s",boom_path="%s",' % (self.boot_path, self.boom_path)) cstr += ('enable_legacy="%s",legacy_format="%s",' % (self.legacy_enable, self.legacy_format)) cstr += 'legacy_sync="%s")' % self.legacy_sync return cstr
[docs] def __init__(self, boot_path=None, boom_path=None, legacy_enable=None, legacy_format=None, legacy_sync=None): """Initialise a new ``BoomConfig`` object with the supplied configuration values, or defaults for any unset arguments. :param boot_path: the path to the system /boot volume :param boom_path: the path to the boom configuration dir :param legacy_enable: enable legacy bootloader support :param legacy_format: the legacy bootlodaer format to write :param legacy_sync: the legacy sync mode """ self.boot_path = boot_path or self.boot_path self.boom_path = boom_path or self.boom_path self.legacy_enable = legacy_enable or self.legacy_enable self.legacy_format = legacy_format or self.legacy_format self.legacy_sync = legacy_sync or self.legacy_sync
__config = BoomConfig()
[docs]def set_boom_config(config): """Set the active configuration to the object ``config`` (which may be any class that includes the ``BoomConfig`` attributes). :param config: a configuration object :returns: None :raises: TypeError if ``config`` does not appear to have the correct attributes. """ global __config def has_value(obj, attr): return hasattr(obj, attr) and getattr(obj, attr) is not None if not has_value(config, "boot_path") or not has_value(config, "boom_path"): raise TypeError("config does not appear to be a BoomConfig object.") __config = config
[docs]def get_boom_config(): """Return the active ``BoomConfig`` object. :returntype: BoomConfig :returns: the active configuration object """ return __config
[docs]def get_boot_path(): """Return the currently configured boot file system path. :returns: the path to the /boot file system. :returntype: str """ return __config.boot_path
[docs]def get_boom_path(): """Return the currently configured boom configuration path. :returns: the path to the BOOT/boom directory. :returntype: str """ return __config.boom_path
[docs]def set_boot_path(boot_path): """Sets the location of the boot file system to ``boot_path``. The path defaults to the '/boot/' mount directory in the root file system: this may be overridden by calling this function with a different path. Calling ``set_boom_root_path()`` will re-set the value returned by ``get_boom_path()`` to the default boom configuration sub- directory within the new boot file system. The location of the boom configuration path may be configured separately by calling ``set_boom_root_path()`` after setting the boot path. :param boot_path: the path to the 'boom/' directory containing boom profiles and configuration. :returnsNone: ``None`` :raises: ValueError if ``boot_path`` does not exist. """ global __config if not isabs(boot_path): raise ValueError("boot_path must be an absolute path: %s" % boot_path) if not path_exists(boot_path): raise ValueError("Path '%s' does not exist" % boot_path) __config.boot_path = boot_path _log_debug("Set boot path to: %s" % boot_path) __config.boom_path = path_join(boot_path, DEFAULT_BOOM_DIR) _log_debug("Set boom path to: %s" % __config.boom_path)
[docs]def set_boom_path(boom_path): """Set the location of the boom configuration directory. Set the location of the boom configuration path stored in the active configuration to ``boom_path``. This defaults to the 'boom/' sub-directory in the boot file system specified by ``config.boot_path``: this may be overridden by calling this function with a different path. :param boom_path: the path to the 'boom/' directory containing boom profiles and configuration. :returns: ``None`` :raises: ValueError if ``boom_path`` does not exist. """ global __config if isabs(boom_path) and not path_exists(boom_path): raise ValueError("Boom path %s does not exist" % boom_path) elif not path_exists(path_join(__config.boot_path, boom_path)): raise ValueError("Boom path %s does not exist" % boom_path) if not isabs(boom_path): boom_path = path_join(__config.boot_bath, boom_path) if not path_exists(path_join(boom_path, "profiles")): raise ValueError("Path does not contain a valid boom configuration" ": %s" % path_join(boom_path, "profiles")) _log_debug("Set boom path to: %s" % DEFAULT_BOOM_DIR) __config.boom_path = boom_path
def _parse_btrfs_subvol(subvol): """Parse a BTRFS subvolume string. Parse a BTRFS subvolume specification into either a subvolume path string, or a string containing a subvolume identifier. :param subvol: The subvolume parameter to parse :returns: A string containing the subvolume path or ID :returntype: ``str`` :raises: ValueError if no valid subvolume was found """ if not subvol: return None subvol_id = None try: subvol_id = int(subvol) subvol = str(subvol_id) except ValueError: if not subvol.startswith('/'): raise ValueError("Unrecognised BTRFS subvolume: %s" % subvol) return subvol # # Selection criteria class #
[docs]class Selection(object): """Selection() Selection criteria for boom BootEntry, OsProfile and BootParams. Selection criteria specified as a simple boolean AND of all criteria with a non-None value. """ # BootEntry fields boot_id = None title = None version = None machine_id = None linux = None initrd = None efi = None options = None devicetree = None # BootParams fields root_device = None lvm_root_lv = None btrfs_subvol_path = None btrfs_subvol_id = None # OsProfile fields os_id = None os_name = None os_short_name = None os_version = None os_version_id = None os_uname_pattern = None os_kernel_pattern = None os_initramfs_pattern = None os_root_opts_lvm2 = None os_root_opts_btrfs = None os_options = None #: Selection criteria applying to BootEntry objects entry_attrs = [ "boot_id", "title", "version", "machine_id", "linux", "initrd", "efi", "options", "devicetree" ] #: Selection criteria applying to BootParams objects params_attrs = [ "root_device", "lvm_root_lv", "btrfs_subvol_path", "btrfs_subvol_id" ] #: Selection criteria applying to OsProfile objects profile_attrs = [ "os_id", "os_name", "os_short_name", "os_version", "os_version_id", "os_uname_pattern", "os_kernel_pattern", "os_initramfs_pattern", "os_root_opts_lvm2", "os_root_opts_btrfs", "os_options" ]
[docs] def __str__(self): """Format this ``Selection`` object as a human readable string. :returns: A human readable string representation of this Selection object :returntype: string """ all_attrs = self.entry_attrs + self.params_attrs + self.profile_attrs attrs = [attr for attr in all_attrs if self.__attr_has_value(attr)] strval = "" tail = ", " for attr in attrs: strval += "%s='%s'%s" % (attr, getattr(self, attr), tail) return strval.rstrip(tail)
[docs] def __repr__(self): """Format this ``Selection`` object as a machine readable string. The returned string may be passed to the Selection initialiser to duplicate the original Selection. :returns: A machine readable string representation of this Selection object :returntype: string """ return "Selection(" + str(self) + ")"
[docs] def __init__(self, boot_id=None, title=title, version=version, machine_id=None, linux=None, initrd=None, efi=None, root_device=None, lvm_root_lv=None, btrfs_subvol_path=None, btrfs_subvol_id=None, os_id=None, os_name=None, os_short_name=None, os_version=None, os_version_id=None, os_options=None, os_uname_pattern=None, kernel_pattern=None, initramfs_pattern=None): """Initialise a new Selection object. Initialise a new Selection object with the specified selection criteria. :param boot_id: The boot_id to match :param title: The title to match :param version: The version to match :param machine_id: The machine_id to match :param linux: The BootEntry kernel image to match :param initrd: The BootEntry initrd image to match :param efi: The BootEntry efi image to match :param root_device: The root_device to match :param lvm_root_lv: The lvm_root_lv to match :param btrfs_subvol_path: The btrfs_subvol_path to match :param btrfs_subvol_id: The btrfs_subvol_id to match :param os_id: The os_id to match :param os_name: The os_name to match :param os_short_name: The os_short_name to match :param os_version: The os_version to match :param os_version_id: The os_version_id to match :param os_options: The os_options to match :param os_uname_pattern: The os_uname_pattern to match :param os_kernel_pattern: The kernel_pattern to match :param os_initramfs_pattern: The initramfs_pattern to match :returns: A new Selection instance :returntype: Selection """ self.boot_id = boot_id self.title = title self.version = version self.machine_id = machine_id self.linux = linux self.initrd = initrd self.efi = efi self.root_device = root_device self.lvm_root_lv = lvm_root_lv self.btrfs_subvol_path = btrfs_subvol_path self.btrfs_subvol_id = btrfs_subvol_id self.os_id = os_id self.os_name = os_name self.os_short_name = os_short_name self.os_version = os_version self.os_version_id = os_version_id self.os_options = os_options self.os_uname_pattern = os_uname_pattern self.os_kernel_pattern = kernel_pattern self.os_initramfs_pattern = initramfs_pattern
[docs] @classmethod def from_cmd_args(cls, args): """Initialise Selection from command line arguments. Construct a new ``Selection`` object from the command line arguments in ``cmd_args``. Each set selection attribute from ``cmd_args`` is copied into the Selection. The resulting object may be passed to either the ``BootEntry`` or ``OsProfile`` search functions (``find_entries`` and ``find_profiles``), as well as the ``boom.command`` calls that accept a selection argument. :param args: The command line selection arguments. :returns: A new Selection instance :returntype: Selection """ subvol = _parse_btrfs_subvol(args.btrfs_subvolume) if subvol and subvol.startswith('/'): btrfs_subvol_path = subvol btrfs_subvol_id = None elif subvol: btrfs_subvol_id = subvol btrfs_subvol_path = None else: btrfs_subvol_id = btrfs_subvol_path = None s = Selection(boot_id=args.boot_id, title=args.title, version=args.version, machine_id=args.machine_id, linux=args.linux, initrd=args.initrd, efi=args.efi, root_device=args.root_device, lvm_root_lv=args.root_lv, btrfs_subvol_path=btrfs_subvol_path, btrfs_subvol_id=btrfs_subvol_id, os_id=args.profile, os_name=args.name, os_short_name=args.short_name, os_version=args.os_version, os_version_id=args.os_version_id, os_options=args.os_options, os_uname_pattern=args.uname_pattern) _log_debug("Initialised %s from arguments" % repr(s)) return s
def __attr_has_value(self, attr): """Test whether an attribute is defined. Return ``True`` if the specified attribute name is currently defined, or ``False`` otherwise. :param attr: The name of the attribute to test :returns: ``True`` if ``attr`` is set or ``False`` otherwise :returntype: bool """ return hasattr(self, attr) and getattr(self, attr) is not None
[docs] def check_valid_selection(self, entry=False, params=False, profile=False): """Check a Selection for valid criteria. Check this ``Selection`` object to ensure it contains only criteria that are valid for the specified object type(s). Returns ``None`` if the object passes the check, or raise ``ValueError`` if invalid criteria exist. :param entry: ``Selection`` may include BootEntry data :param params: ``Selection`` may include BootParams data :param profile: ``Selection`` may include OsProfile data :returns: ``None`` on success :returntype: ``NoneType`` :raises: ``ValueError`` if excluded criteria are present """ all_attrs = self.entry_attrs + self.params_attrs + self.profile_attrs valid_attrs = [] invalid_attrs = [] if entry: valid_attrs += self.entry_attrs if entry or params: valid_attrs += self.params_attrs if profile: valid_attrs += self.profile_attrs for attr in all_attrs: if self.__attr_has_value(attr) and attr not in valid_attrs: invalid_attrs.append(attr) if invalid_attrs: invalid = ", ".join(invalid_attrs) raise ValueError("Invalid criteria for selection type: %s" % invalid)
[docs] def is_null(self): """Test this Selection object for null selection criteria. Return ``True`` if this ``Selection`` object matches all objects, or ``False`` otherwise. :returns: ``True`` if this Selection is null :returntype: bool """ all_attrs = self.entry_attrs + self.params_attrs + self.profile_attrs attrs = [attr for attr in all_attrs if self.__attr_has_value(attr)] return not any(attrs)
# # Generic routines for parsing name-value pairs. # def _blank_or_comment(line): """Test whether line is empty of contains a comment. Test whether the ``line`` argument is either blank, or a whole-line comment. :param line: the line of text to be checked. :returns: ``True`` if the line is blank or a comment, and ``False`` otherwise. :returntype: bool """ return not line.strip() or line.lstrip().startswith('#') def _parse_name_value(nvp, separator="="): """Parse a name value pair string. Parse a ``name='value'`` style string into its component parts, stripping quotes from the value if necessary, and return the result as a (name, value) tuple. :param nvp: A name value pair optionally with an in-line comment. :param separator: The separator character used in this name value pair, or ``None`` to splir on white space. :returns: A ``(name, value)`` tuple. :returntype: (string, string) tuple. """ val_err = ValueError("Malformed name/value pair: %s" % nvp) try: # Only strip newlines: values may contain embedded # whitespace anywhere within the string. name, value = nvp.rstrip('\n').split(separator, 1) except: raise val_err # Value cannot start with '=' if value.startswith('='): raise val_err invalid_name_chars = [ '!', '+', '~', '#', '@', '"', "'", '$', '%', '^', '&', '*', '(', ')', '{', '}', '?', '<', '>', '/', '\\', '[', ']', ',', '|', '=', "'", ':', ';' ] if any([v for v in invalid_name_chars if v in name]): raise ValueError("Invalid characters in name: %s" % name) # FIXME: support preservation of in-line comments for profiles # (BLS currently only allows whole line comments). if "#" in value: value, comment = value.split("#", 1) name = name.strip() value = value.lstrip() if value.startswith('"') or value.startswith("'"): value = value[1:-1] return (name, value) def _find_minimum_sha_prefix(shas, min_prefix): """Find the minimum SHA prefix length guaranteeing uniqueness. Find the minimum unique prefix for the set of SHA IDs in the set ``shas``. :param shas: A set of SHA IDs :returns: The minimum unique prefix length for the set :returntype: int """ shas = list(shas) shas.sort() for sha in shas: if shas.index(sha) == len(shas) - 1: continue def _next_sha(shas, sha): return shas[shas.index(sha) + 1] while sha[:min_prefix] == _next_sha(shas, sha)[:min_prefix]: min_prefix += 1 return min_prefix def _get_machine_id(): """Return the current host's machine-id. Get the machine-id value for the running system by reading from ``/etc/machine-id`` and return it as a string. :returns: The ``machine_id`` as a string :returntype: str """ if path_exists(_MACHINE_ID): path = _MACHINE_ID elif path_exists(_DBUS_MACHINE_ID): path = _DBUS_MACHINE_ID else: return None with open(path, "r") as f: try: machine_id = f.read().strip() except Exception as e: _log_error("Could not read machine-id from '%s': %s" % (_MACHINE_ID, e)) machine_id = None return machine_id __all__ = [ # boom module constants 'DEFAULT_BOOT_PATH', 'DEFAULT_BOOM_PATH', # Configuration class 'BoomConfig', # Profile format keys 'FMT_VERSION', 'FMT_LVM_ROOT_LV', 'FMT_LVM_ROOT_OPTS', 'FMT_BTRFS_SUBVOLUME', 'FMT_BTRFS_SUBVOL_ID', 'FMT_BTRFS_SUBVOL_PATH', 'FMT_BTRFS_ROOT_OPTS', 'FMT_ROOT_DEVICE', 'FMT_ROOT_OPTS', 'FMT_KERNEL', 'FMT_INITRAMFS', 'FORMAT_KEYS', # API Classes 'Selection', # Path configuration 'get_boot_path', 'get_boom_path', 'set_boot_path', 'set_boom_path', # Persistent configuration 'set_boom_config', 'get_boom_config', # boom exception base class 'BoomError', # Debug logging 'get_debug_mask', 'set_debug_mask', 'BOOM_DEBUG_PROFILE', 'BOOM_DEBUG_ENTRY', 'BOOM_DEBUG_REPORT', 'BOOM_DEBUG_COMMAND', 'BOOM_DEBUG_ALL', # Utility routines '_blank_or_comment', '_parse_name_value', '_parse_btrfs_subvol', '_find_minimum_sha_prefix', '_get_machine_id' ] # vim: set et ts=4 sw=4 :