AnonSec Shell
Server IP : 52.91.253.208  /  Your IP : 18.220.53.93   [ Reverse IP ]
Web Server : Apache
System : Linux ip-172-26-9-9 4.19.0-25-cloud-amd64 #1 SMP Debian 4.19.289-1 (2023-07-24) x86_64
User : daemon ( 1)
PHP Version : 7.3.18
Disable Function : NONE
Domains : 3 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /bin/X11/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /bin/X11/unattended-upgrade
#!/usr/bin/python3
# Copyright (c) 2005-2018 Canonical Ltd
#
# AUTHOR:
# Michael Vogt <mvo@ubuntu.com>
# Balint Reczey <rbalint@ubuntu.com>

# This file is part of unattended-upgrades
#
# unattended-upgrades is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or (at
# your option) any later version.
#
# unattended-upgrades is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with unattended-upgrades; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
#

import atexit
import copy
import datetime
import errno
import email.charset
import fcntl
import fnmatch
import gettext
try:
    from gi.repository.Gio import NetworkMonitor
except ImportError:
    pass
import grp
import io
import locale
import logging
import logging.handlers
import re
import os
import select
import signal
import socket
import string
import subprocess
import sys
import syslog

try:
    from typing import AbstractSet, cast, Dict, Iterable, List, Tuple
    AbstractSet  # pyflakes
    Dict  # pyflakes
    Iterable  # pyflakes
    List  # pyflakes
    Tuple  # pyflakes
except ImportError:
    pass

from datetime import date
from email.message import Message
from gettext import gettext as _
from io import StringIO
from optparse import (
    OptionParser,
    SUPPRESS_HELP,
)

from subprocess import (
    Popen,
    PIPE,
)
import apt
import apt_inst
import apt_pkg

import distro_info


# the reboot required flag file used by packages
REBOOT_REQUIRED_FILE = "/var/run/reboot-required"
MAIL_BINARY = "/usr/bin/mail"
SENDMAIL_BINARY = "/usr/sbin/sendmail"
USERS = "/usr/bin/users"
# no py3 lsb_release in debian :/
DISTRO_CODENAME = subprocess.check_output(
    ["lsb_release", "-c", "-s"], universal_newlines=True).strip()  # type: str
DISTRO_DESC = subprocess.check_output(
    ["lsb_release", "-d", "-s"], universal_newlines=True).strip()  # type: str
DISTRO_ID = subprocess.check_output(
    ["lsb_release", "-i", "-s"], universal_newlines=True).strip()  # type: str

# Number of days before release of devel where we enable unattended
# upgrades.
DEVEL_UNTIL_RELEASE = datetime.timedelta(days=21)

# progress information is written here
PROGRESS_LOG = "/var/run/unattended-upgrades.progress"
PID_FILE = "/var/run/unattended-upgrades.pid"
LOCK_FILE = "/var/run/unattended-upgrades.lock"

# set from the sigint signal handler
SIGNAL_STOP_REQUEST = False

# messages to be logged only once
logged_msgs = set()  # type: AbstractSet[str]


class LoggingDateTime:
    """The date/time representation for the dpkg log file timestamps"""
    LOG_DATE_TIME_FMT = "%Y-%m-%d  %H:%M:%S"

    @classmethod
    def as_string(cls):
        # type: () -> str
        """Return the current date and time as LOG_DATE_TIME_FMT string"""
        return datetime.datetime.now().strftime(cls.LOG_DATE_TIME_FMT)

    @classmethod
    def from_string(cls, logstr):
        # type: (str) -> datetime.datetime
        """Take a LOG_DATE_TIME_FMT string and return datetime object"""
        return datetime.datetime.strptime(logstr, cls.LOG_DATE_TIME_FMT)


class UnknownMatcherError(ValueError):
    pass


class NoAllowedOriginError(ValueError):
    pass


class UnattendedUpgradesCache(apt.Cache):

    def __init__(self, rootdir, allowed_origins):
        # type: (str, List[str]) -> None
        self._cached_candidate_pkgnames = set()  # type: AbstractSet[str]
        self.allowed_origins = allowed_origins
        apt.Cache.__init__(self, rootdir=rootdir)

        # pre-heat lazy-loaded modules to avoid crash on python upgrade
        datetime.datetime.strptime("", "")

        # generate versioned_kernel_pkgs_regexp for later use
        apt_versioned_kernel_pkgs = apt_pkg.config.value_list(
            "APT::VersionedKernelPackages")
        if apt_versioned_kernel_pkgs:
            self.versioned_kernel_pkgs_regexp = re.compile("(" + "|".join(
                ["^" + p + "-[0-9]+\\.[0-9\\.]+-.*"
                 for p in apt_versioned_kernel_pkgs]) + ")")
            logging.debug("Using %s regexp to find kernel packages",
                          self.versioned_kernel_pkgs_regexp.pattern)
            running_kernel_version = subprocess.check_output(
                ["uname", "-r"], universal_newlines=True).rstrip()
            self.running_kernel_pkgs_regexp = re.compile("(" + "|".join(
                [("^" + p + "-" + re.escape(running_kernel_version) + "$")
                 for p in apt_versioned_kernel_pkgs]) + ")")
            logging.debug("Using %s regexp to find running kernel packages",
                          self.running_kernel_pkgs_regexp.pattern)
        else:
            logging.debug("APT::VersionedKernelPackages is not set")
            self.versioned_kernel_pkgs_regexp = None
            self.running_kernel_pkgs_regexp = None

    def adjust_candidate(self, pkg):
        # type: (apt.Package) -> bool
        """ Adjust origin and return True if adjustment took place

            This is needed when e.g. a package is available in
            the security pocket but there is also a package in the
            updates pocket with a higher version number
        """
        try:
            new_cand = ver_in_allowed_origin(pkg, self.allowed_origins)
            # Only adjust to lower versions to avoid flipping back and forth
            # and to avoid picking a newer version, not selected by apt.
            # This helps avoiding upgrades to experimental's packages.
            if new_cand.version < pkg.candidate.version:
                logging.debug("adjusting candidate version: %s" % new_cand)
                pkg.candidate = new_cand
                return True
            else:
                return False
        except NoAllowedOriginError:
            return False

    def call_adjusted(self, function, pkg, **kwargs):
        """Call function, but with adjusting
           packages in changes to come from allowed origins

           Note that as a side effect more package's candidate can be
           adjusted than only the one's in the final changes set.
        """
        new_pkgs_to_adjust = []  # List[str]
        pkgs_with_no_allowed_origin = []

        # adjust candidates in advance if needed
        for pkg_name in self._cached_candidate_pkgnames:
            self.adjust_candidate(self[pkg_name])

        if function == apt.package.Package.mark_upgrade \
           and not pkg.is_upgradable:
            raise NoAllowedOriginError
        function(pkg, **kwargs)
        changes = self.get_changes()
        for marked_pkg in changes:
            if marked_pkg.name in self._cached_candidate_pkgnames:
                continue
            if not is_allowed_origin(marked_pkg.candidate,
                                     self.allowed_origins):
                try:
                    ver_in_allowed_origin(marked_pkg,
                                          self.allowed_origins)
                    # important! this avoids downgrades below
                    if pkg.is_installed and not pkg.is_upgradable:
                        continue
                    new_pkgs_to_adjust.append(marked_pkg)
                except NoAllowedOriginError:
                    pkgs_with_no_allowed_origin.append(marked_pkg)

        if new_pkgs_to_adjust:
            new_pkg_adjusted = False
            for pkg_to_adjust in new_pkgs_to_adjust:
                if self.adjust_candidate(pkg_to_adjust):
                    self._cached_candidate_pkgnames.add(pkg_to_adjust.name)
                    new_pkg_adjusted = True
            if new_pkg_adjusted:
                self.call_adjusted(function, pkg, **kwargs)
        else:
            if pkgs_with_no_allowed_origin:
                raise NoAllowedOriginError

    def mark_upgrade_adjusted(self, pkg, **kwargs):
        self.call_adjusted(apt.package.Package.mark_upgrade, pkg, **kwargs)

    def mark_install_adjusted(self, pkg, **kwargs):
        self.call_adjusted(apt.package.Package.mark_install, pkg, **kwargs)


class LogInstallProgress(apt.progress.base.InstallProgress):
    """ Install progress that writes to self.progress_log
        (/var/run/unattended-upgrades.progress by default)
    """

    def __init__(self, logfile_dpkg, verbose=False,
                 progress_log="var/run/unattended-upgrades.progress"):
        # type: (str, bool, str) -> None
        apt.progress.base.InstallProgress.__init__(self)
        self.logfile_dpkg = logfile_dpkg
        self.progress_log = os.path.join(apt_pkg.config.find_dir("Dir"),
                                         progress_log)
        self.verbose = verbose
        self.output_logfd = None  # type: int

    def status_change(self, pkg, percent, status):
        # type: (str, float, str) -> None
        with open(self.progress_log, "w") as f:
            f.write(_("Progress: %s %% (%s)") % (percent, pkg))

    def _fixup_fds(self):
        # () -> None
        required_fds = [0, 1, 2,  # stdin, stdout, stderr
                        self.writefd,
                        self.write_stream.fileno(),
                        self.statusfd,
                        self.status_stream.fileno()
                        ]
        # ensure that our required fds close on exec
        for fd in required_fds[3:]:
            old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
            fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)
        # close all fds
        proc_fd = "/proc/self/fd"
        if os.path.exists(proc_fd):
            error_count = 0
            for fdname in os.listdir(proc_fd):
                try:
                    fd = int(fdname)
                except Exception:
                    print("ERROR: can not get fd for %s" % fdname)
                if fd in required_fds:
                    continue
                try:
                    os.close(fd)
                    # print("closed: ", fd)
                except OSError as e:
                    # there will be one fd that can not be closed
                    # as its the fd from pythons internal diropen()
                    # so its ok to ignore one close error
                    error_count += 1
                    if error_count > 1:
                        print("ERROR: os.close(%s): %s" % (fd, e))

    def _redirect_stdin(self):
        # type: () -> None
        REDIRECT_INPUT = os.devnull
        fd = os.open(REDIRECT_INPUT, os.O_RDWR)
        os.dup2(fd, 0)

    def _redirect_output(self):
        # type: () -> None
        # do not create log in dry-run mode, just output to stdout/stderr
        if not apt_pkg.config.find_b("Debug::pkgDPkgPM", False):
            logfd = self._get_logfile_dpkg_fd()
            os.dup2(logfd, 1)
            os.dup2(logfd, 2)

    def _get_logfile_dpkg_fd(self):
        # type: () -> int
        logfd = os.open(
            self.logfile_dpkg, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0o640)
        try:
            adm_gid = grp.getgrnam("adm").gr_gid
            os.fchown(logfd, 0, adm_gid)
        except (KeyError, OSError):
            pass
        return logfd

    def update_interface(self):
        # type: () -> None
        # call super class first
        apt.progress.base.InstallProgress.update_interface(self)
        self._do_verbose_output_if_needed()

    def _do_verbose_output_if_needed(self):
        # type: () -> None
        # if we are in debug mode, nothing to be more verbose about
        if apt_pkg.config.find_b("Debug::pkgDPkgPM", False):
            return
        # handle verbose
        if self.verbose:
            if self.output_logfd is None:
                self.output_logfd = os.open(self.logfile_dpkg, os.O_RDONLY)
                os.lseek(self.output_logfd, 0, os.SEEK_END)
            try:
                select.select([self.output_logfd], [], [], 0)
                # FIXME: this should be OSError, but in py2.7 it is still
                #        select.error
            except select.error as e:
                if e.errno != errno.EINTR:  # type: ignore
                    logging.exception("select failed")
            # output to stdout in verbose mode only
            os.write(1, os.read(self.output_logfd, 1024))

    def _log_in_dpkg_log(self, msg):
        # type: (str) -> None
        logfd = self._get_logfile_dpkg_fd()
        os.write(logfd, msg.encode("utf-8"))
        os.close(logfd)

    def finish_update(self):
        # type: () -> None
        self._log_in_dpkg_log("Log ended: %s\n\n"
                              % LoggingDateTime.as_string())

    def fork(self):
        # type: () -> int
        self._log_in_dpkg_log("Log started: %s\n"
                              % LoggingDateTime.as_string())
        pid = os.fork()
        if pid == 0:
            self._fixup_fds()
            self._redirect_stdin()
            self._redirect_output()
        return pid


class Unlocked:
    """
    Context manager for unlocking the apt lock while cache.commit() is run
    """

    def __enter__(self):
        # type: () -> None
        try:
            apt_pkg.pkgsystem_unlock_inner()
        except AttributeError:
            try:
                apt_pkg.pkgsystem_unlock()
            except Exception:
                # earlier python-apt used to leak lock
                logging.warning("apt_pkg.pkgsystem_unlock() failed due to not "
                                "holding the lock but trying to continue")
                pass

    def __exit__(self, exc_type, exc_value, exc_tb):
        # type: (object, object, object) -> None
        try:
            apt_pkg.pkgsystem_lock_inner()
        except AttributeError:
            apt_pkg.pkgsystem_lock()


class UnattendedUpgradesResult:
    """
    Represent the (potentially partial) results of an unattended-upgrades
    run
    """
    def __init__(self,
                 success,                 # type: bool
                 result_str="",           # type: str
                 pkgs=[],                 # type: List[str]
                 pkgs_kept_back=[],       # type: List[str]
                 pkgs_removed=[],         # type: List[str]
                 pkgs_kept_installed=[],  # type: List[str]
                 update_stamp=False       # type: bool
                 ):
        # type: (...) -> None
        self.success = success
        self.result_str = result_str
        self.pkgs = pkgs
        self.pkgs_kept_back = pkgs_kept_back
        self.pkgs_removed = pkgs_removed
        self.pkgs_kept_installed = pkgs_kept_installed
        self.update_stamp = update_stamp


def is_dpkg_journal_dirty():
    # type: () -> bool
    """
    Return True if the dpkg journal is dirty
    (similar to debSystem::CheckUpdates)
    """
    d = os.path.join(
        os.path.dirname(apt_pkg.config.find_file("Dir::State::status")),
        "updates")
    for f in os.listdir(d):
        if re.match("[0-9]+", f):
            return True
    return False


def signal_handler(signal, frame):
    # type: (int, object) -> None
    logging.warning("SIGTERM received, will stop")
    global SIGNAL_STOP_REQUEST
    SIGNAL_STOP_REQUEST = True


def log_once(msg):
    #type: (str) -> None
    global logged_msgs
    if msg not in logged_msgs:
        logging.info(msg)
        logged_msgs.add(msg)  # type: ignore


def should_stop():
    # type: () -> bool
    """
    Return True if u-u needs to stop due to signal received or due to the
    system started to run on battery.
    """
    if SIGNAL_STOP_REQUEST:
        logging.warning("SIGNAL received, stopping")
        return True
    try:
        if apt_pkg.config.find_b("Unattended-Upgrade::OnlyOnACPower", True) \
           and subprocess.call("on_ac_power") == 1:
            logging.warning("System is on battery power, stopping")
            return True
    except FileNotFoundError:
        log_once(
            _("Checking if system is running on battery is skipped. Please "
              "install powermgmt-base package to check power status and skip "
              "installing updates when the system is running on battery."))
    if apt_pkg.config.find_b(
            "Unattended-Upgrade::Skip-Updates-On-Metered-Connections", True):
        try:
            if NetworkMonitor.get_network_metered(
                    NetworkMonitor.get_default()):
                logging.warning(_("System is on metered connection, stopping"))
                return True
        except NameError:
            log_once(_("Checking if connection is metered is skipped. Please "
                       "install python3-gi package to detect metered "
                       "connections and skip downloading updates."))
    return False


def substitute(line):
    # type: (str) -> str
    """ substitude known mappings and return a new string

    Currently supported ${distro-release}
    """
    mapping = {"distro_codename": get_distro_codename(),
               "distro_id": get_distro_id()}
    return string.Template(line).substitute(mapping)


def get_distro_codename():
    # type: () -> str
    return DISTRO_CODENAME


def get_distro_id():
    # type: () -> str
    return DISTRO_ID


def get_allowed_origins_legacy():
    # type: () -> List[str]
    """ legacy support for old Allowed-Origins var """
    allowed_origins = []  # type: List[str]
    key = "Unattended-Upgrade::Allowed-Origins"
    try:
        for s in apt_pkg.config.value_list(key):
            # if there is a ":" use that as seperator, else use spaces
            if re.findall(r'(?<!\\):', s):
                (distro_id, distro_codename) = re.split(r'(?<!\\):', s)
            else:
                (distro_id, distro_codename) = s.split()
            # unescape "\:" back to ":"
            distro_id = re.sub(r'\\:', ':', distro_id)
            # escape "," (see LP: #824856) - can this be simpler?
            distro_id = re.sub(r'([^\\]),', r'\1\\,', distro_id)
            distro_codename = re.sub(r'([^\\]),', r'\1\\,', distro_codename)
            # convert to new format
            allowed_origins.append("o=%s,a=%s" % (substitute(distro_id),
                                   substitute(distro_codename)))
    except ValueError:
        logging.error(_("Unable to parse %s." % key))
        raise
    return allowed_origins


def get_allowed_origins():
    # type: () -> List[str]
    """ return a list of allowed origins from apt.conf

    This will take substitutions (like distro_id) into account.
    """
    allowed_origins = get_allowed_origins_legacy()
    key = "Unattended-Upgrade::Origins-Pattern"
    try:
        for s in apt_pkg.config.value_list(key):
            allowed_origins.append(substitute(s))
    except ValueError:
        logging.error(_("Unable to parse %s." % key))
        raise
    return allowed_origins


def match_whitelist_string(whitelist, origin):
    # type: (str, apt.package.Origin) -> bool
    """
    take a whitelist string in the form "origin=Debian,label=Debian-Security"
    and match against the given python-apt origin. A empty whitelist string
    never matches anything.
    """
    whitelist = whitelist.strip()
    if whitelist == "":
        logging.warning("empty match string matches nothing")
        return False
    res = True
    # make "\," the html quote equivalent
    whitelist = whitelist.replace("\\,", "%2C")
    for token in whitelist.split(","):
        # strip and unquote the "," back
        (what, value) = [s.strip().replace("%2C", ",")
                         for s in token.split("=")]
        # logging.debug("matching %s=%s against %s" % (
        #              what, value, origin))
        # support substitution here as well
        value = substitute(value)
        # first char is apt-cache policy output, send is the name
        # in the Release file
        if what in ("o", "origin"):
            match = fnmatch.fnmatch(origin.origin, value)
        elif what in ("l", "label"):
            match = fnmatch.fnmatch(origin.label, value)
        elif what in ("a", "suite", "archive"):
            match = fnmatch.fnmatch(origin.archive, value)
        elif what in ("c", "component"):
            match = fnmatch.fnmatch(origin.component, value)
        elif what in ("site",):
            match = fnmatch.fnmatch(origin.site, value)
        elif what in ("n", "codename",):
            match = fnmatch.fnmatch(origin.codename, value)
        else:
            raise UnknownMatcherError(
                "Unknown whitelist entry for matcher %s (token %s)" % (
                    what, token))
        # update res
        res = res and match
        # logging.debug("matching %s=%s against %s" % (
        #              what, value, origin))
    return res


def cache_commit(cache,           # type: apt.Cache
                 logfile_dpkg,    # type: str
                 verbose,         # type: bool
                 iprogress=None,  # type: apt.progress.base.InstallProgress
                 ):
    # type: (...) -> Tuple[bool, Exception]
    """Commit the changes from the given cache to the system"""

    error = None
    res = False
    if iprogress is None:
        iprogress = LogInstallProgress(logfile_dpkg, verbose)

    try:
        if hasattr(apt_pkg, "pkgsystem_lock_inner"):
            res = cache.commit(install_progress=iprogress)
        else:
            with Unlocked():
                res = cache.commit(install_progress=iprogress)
        cache.open()
    except SystemError as e:
        error = e
        if verbose:
            logging.exception("Exception happened during upgrade.")
        cache.clear()
    return res, error


def upgrade_normal(cache, logfile_dpkg, verbose):
    # type: (apt.Cache, str, bool) -> bool
    res, error = cache_commit(cache, logfile_dpkg, verbose)
    if res:
        logging.info(_("All upgrades installed"))
    else:
        logging.error(_("Installing the upgrades failed!"))
        logging.error(_("error message: %s"), error)
        logging.error(_("dpkg returned a error! See %s for details"),
                      logfile_dpkg)
    return res


def upgrade_in_minimal_steps(cache,            # type: apt.Cache
                             pkgs_to_upgrade,  # type: List[str]
                             blacklist,        # type: List[str]
                             whitelist,        # type: List[str]
                             logfile_dpkg="",  # type: str
                             verbose=False,    # type: bool
                             ):
    # type: (...) -> bool
    install_log = LogInstallProgress(logfile_dpkg, verbose)

    # double check any changes we do
    allowed_origins = get_allowed_origins()

    # to upgrade contains the package names
    to_upgrade = set(pkgs_to_upgrade)
    for pkgname in upgrade_order(to_upgrade, cache):
        # upgrade packages and dependencies in increasing expected size of
        # package sets to upgrade/install together
        if pkgname not in to_upgrade:
            # pkg is upgraded in a previous set
            continue
        if should_stop():
            return False
        pkg = cache[pkgname]
        if pkg.is_upgradable:
            cache.mark_upgrade_adjusted(pkg,
                                        from_user=not pkg.is_auto_installed)
        elif not pkg.is_installed:
            cache.mark_install_adjusted(pkg, from_user=False)
        else:
            continue
        # double check that we are not running into side effects like
        # what could have been caused LP: #1020680
        if not check_changes_for_sanity(cache, allowed_origins, blacklist,
                                        whitelist):
            logging.info("While building minimal partition: "
                         "cache has not allowed changes")
            cache.clear()
            continue
        changes = [p.name for p in cache.get_changes()]
        if not changes:
            continue

        # write progress log information
        if len(pkgs_to_upgrade) > 0:
            all_count = len(pkgs_to_upgrade)
            remaining_count = all_count - len(to_upgrade)
            percent = remaining_count / float(all_count * 100.0)
        else:
            percent = 100.0
        install_log.status_change(pkg=",".join(changes),
                                  percent=percent,
                                  status="")
        # apply changes
        logging.debug("applying set %s" % changes)

        res, error = cache_commit(cache, logfile_dpkg, verbose, install_log)
        if error:
            if verbose:
                logging.exception("Exception happened during upgrade.")
            logging.error(_("Installing the upgrades failed!"))
            logging.error(_("error message: %s"), error)
            logging.error(_("dpkg returned a error! See %s for details"),
                          logfile_dpkg)
            return False
        to_upgrade = to_upgrade - set(changes)
        logging.debug("left to upgrade %s" % to_upgrade)
        if len(to_upgrade) == 0:
            logging.info(_("All upgrades installed"))
            break
    return True


def is_allowed_origin(ver, allowed_origins):
    # type: (apt.package.Version, List[str]) -> bool
    if not ver:
        return False
    for origin in ver.origins:
        for allowed in allowed_origins:
            if match_whitelist_string(allowed, origin):
                return True
    return False


def ver_in_allowed_origin(pkg, allowed_origins):
    # type: (apt.Package, List[str]) -> apt.package.Version
    for ver in pkg.versions:
        # ignore versions that the user marked with priority < 100
        # (and ensure we have a python-apt that supports this)
        try:
            if ver.policy_priority < 100:
                logging.debug("ignoring ver %s with priority < 0" % ver)
                continue
        except AttributeError:
            pass
        if is_allowed_origin(ver, allowed_origins):
            # leave as soon as we have the highest new candidate
            return ver
    raise NoAllowedOriginError()


def is_pkgname_in_blacklist(pkgname, blacklist):
    # type: (str, List[str]) -> bool
    for blacklist_regexp in blacklist:
        if re.match(blacklist_regexp, pkgname):
            logging.debug("skipping blacklisted package %s" % pkgname)
            return True
    return False


def is_pkgname_in_whitelist(pkgname, whitelist):
    # type: (str, List[str]) -> bool
    # a empty whitelist means the user does not want to use this feature
    if not whitelist:
        return True
    for whitelist_regexp in whitelist:
        if re.match(whitelist_regexp, pkgname):
            logging.debug("only upgrading the following package %s" %
                          pkgname)
            return True
    return False


def is_pkg_change_allowed(pkg, blacklist, whitelist):
    # type: (apt.Package, List[str], List[str]) -> bool
    if is_pkgname_in_blacklist(pkg.name, blacklist):
        logging.debug("pkg %s package has been blacklisted" % pkg.name)
        return False
    # a strict whitelist will not allow any changes not in the
    # whitelist, most people will want the relaxed whitelist
    # that whitelists a package but pulls in the package
    # dependencies
    strict_whitelist = apt_pkg.config.find_b(
        "Unattended-Upgrade::Package-Whitelist-Strict", False)
    if strict_whitelist and \
       not is_pkgname_in_whitelist(pkg.name, whitelist):

        logging.debug("pkg %s package is not whitelisted" %
                      pkg.name)
        return False
    if pkg._pkg.selected_state == apt_pkg.SELSTATE_HOLD:
        logging.debug("pkg %s is on hold" % pkg.name)
        return False
    return True


def transitive_dependencies(pkg, cache, acc=set()):
    # type (apt.Package, apt.Cache, AbstractSet[str]) -> bool
    """ All (transitive) dependencies of the package
    """
    # Note that alternative (|) dependencies are collected, too
    valid_types = {'Depends', 'PreDepends', 'Recommends'}
    for dep in pkg.candidate.dependencies:
        for base_dep in dep:
            if base_dep.name not in acc and base_dep.rawtype in valid_types:
                acc.add(base_dep.name)
                try:
                    transitive_dependencies(cache[base_dep.name], cache, acc)
                except KeyError:
                    pass
    return acc


def upgrade_order(to_upgrade, cache):
    # type: (AbstractSet[str], apt.Cache) -> List[str]
    """  Sort pkg names by the expected number of other packages to be upgraded
         with it. The calculation is not 100% accurate, it is an approximation.
    """

    upgrade_set_sizes = {}
    # calculate upgrade sets
    for pkgname in to_upgrade:
        pkg = cache[pkgname]
        upgrade_set_sizes[pkgname] = \
            len(transitive_dependencies(pkg, cache).intersection(to_upgrade))
    return sorted(upgrade_set_sizes, key=upgrade_set_sizes.get)


def check_changes_for_sanity(cache, allowed_origins, blacklist, whitelist,
                             desired_pkg=None):
    # type: (apt.Cache, List[str], List[str], List[str], apt.Package) -> bool
    if cache._depcache.broken_count != 0:
        return False
    # If there are no packages to be installed they were kept back
    if cache.install_count == 0:
        return False
    changes = cache.get_changes()
    for pkg in changes:
        if pkg.marked_delete:
            logging.debug("pkg %s now marked delete" % pkg.name)
            return False
        if pkg.marked_install or pkg.marked_upgrade:
            # apt will never fallback from a trusted to a untrusted
            # origin so its good enough if we have a single trusted one
            if not any([o.trusted for o in pkg.candidate.origins]):
                logging.debug("pkg %s is untrusted" % pkg.name)
                return False
            if not is_allowed_origin(pkg.candidate, allowed_origins):
                logging.debug("pkg %s not in allowed origin" % pkg.name)
                return False
            if not is_pkg_change_allowed(pkg, blacklist, whitelist):
                return False
            # check if the package is unsafe to upgrade unattended
            ignore_require_restart = apt_pkg.config.find_b(
                "Unattended-Upgrade::IgnoreAppsRequireRestart", False)
            upgrade_requires = pkg.candidate.record.get("Upgrade-Requires")
            if pkg.marked_upgrade and ignore_require_restart is False \
               and upgrade_requires == "app-restart":
                logging.debug("pkg %s requires app-restart, not safe to "
                              "upgrade unattended")
                return False
    # check that the package we want to upgrade is in the change set
    if desired_pkg and desired_pkg not in changes:
        return False
    return True


def is_deb(file):
    # type: (str) -> bool
    if file.endswith(".deb"):
        return True
    else:
        return False


def pkgname_from_deb(debfile):
    # type: (str) -> str
    # FIXME: add error checking here
    try:
        control = apt_inst.DebFile(debfile).control.extractdata("control")
        sections = apt_pkg.TagSection(control)
        return sections["Package"]
    except (IOError, SystemError) as e:
        logging.error("failed to read deb file %s (%s)" % (debfile, e))
        # dumb fallback
        return debfile.split("_")[0]


def get_md5sum_for_file_in_deb(deb_file, conf_file):
    # type: (str, str) -> str
    dpkg_cmd = ["dpkg-deb", "--fsys-tarfile", deb_file]
    tar_cmd = ["tar", "-x", "-O", "-f", "-", "." + conf_file]
    md5_cmd = ["md5sum"]
    dpkg_p = Popen(dpkg_cmd, stdout=PIPE)
    tar_p = Popen(tar_cmd, stdin=dpkg_p.stdout, stdout=PIPE,
                  universal_newlines=True)
    md5_p = Popen(md5_cmd, stdin=tar_p.stdout, stdout=PIPE,
                  universal_newlines=True)
    pkg_md5sum = md5_p.communicate()[0].split()[0]
    for p in [dpkg_p, tar_p, md5_p]:
        p.stdout.close()
        p.wait()
    return pkg_md5sum


# prefix is *only* needed for the build-in tests
def conffile_prompt(destFile, prefix=""):
    # type: (str, str) -> bool
    logging.debug("check_conffile_prompt(%s)" % destFile)
    pkgname = pkgname_from_deb(destFile)

    # get the conffiles for the /var/lib/dpkg/status file
    status_file = apt_pkg.config.find("Dir::State::status")
    with open(status_file, "r") as f:
        tagfile = apt_pkg.TagFile(f)
        conffiles = ""
        for section in tagfile:
            if section.get("Package") == pkgname:
                logging.debug("found pkg: %s" % pkgname)
                if "Conffiles" in section:
                    conffiles = section.get("Conffiles")
                    break

    # get conffile value from pkg, its ok if the new version
    # does not have conffiles anymore
    pkg_conffiles = ""
    try:
        deb = apt_inst.DebFile(destFile)
        pkg_conffiles = deb.control.extractdata("conffiles").strip().decode(
            "utf-8")
    except SystemError as e:
        print(_("Apt returned an error, exiting"))
        print(_("error message: %s") % e)
        logging.error(_("Apt returned an error, exiting"))
        logging.error(_("error message: %s"), e)
        raise
    except LookupError as e:
        logging.debug("No conffiles in deb %s (%s)" % (destFile, e))

    # Conffiles:
    #  /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c
    dpkg_status_conffiles = {}
    for line in conffiles.splitlines():
        # ignore empty lines
        line = line.strip()
        if not line:
            continue
        # show what we do
        logging.debug("conffile line: %s", line)
        li = line.split()
        conf_file = li[0]
        md5 = li[1]
        if len(li) > 2:
            obs = li[2]
        else:
            obs = None
        # ignore if conffile is obsolete
        if obs == "obsolete":
            continue
        # ignore state "newconffile" until its clearer if there
        # might be a dpkg prompt (LP: #936870)
        if md5 == "newconffile":
            continue
        if not pkg_conffiles or conf_file not in pkg_conffiles.split("\n"):
            logging.debug("%s not in package conffiles %s" % (
                conf_file, pkg_conffiles))
            continue
        # record for later
        dpkg_status_conffiles[conf_file] = md5

        # the package replaces a directory wih a configuration file
        #
        # if the package changed this way it is safe to assume that
        # the transition happens without showing a prompt but if the admin
        # created the directory the admin will need to resolve it after
        # being notified about the unexpected prompt
        if os.path.isdir(prefix + conf_file):
            continue

        # test against the installed file, if the local file got deleted
        # by the admin thats ok but it may still trigger a conffile prompt
        # (see debian #788049)
        current_md5 = ""
        if os.path.exists(prefix + conf_file):
            with open(prefix + conf_file, 'rb') as fb:
                current_md5 = apt_pkg.md5sum(fb)
        logging.debug("current md5: %s" % current_md5)

        # hashes are the same, no conffile prompt
        if current_md5 == md5:
            continue
        # calculate md5sum from the deb (may take a bit)
        pkg_md5sum = get_md5sum_for_file_in_deb(destFile, conf_file)
        logging.debug("pkg_md5sum: %s" % pkg_md5sum)
        # the md5sum in the deb is unchanged, this will not
        # trigger a conffile prompt
        if pkg_md5sum == md5:
            continue
        # if we made it to this point:
        #  current_md5 != pkg_md5sum != md5
        # and that will trigger a conffile prompt, we can
        # stop processing at this point and just return True
        return True

    # now check if there are conffiles in the pkg that where not there
    # in the previous version in the dpkg status file
    if pkg_conffiles:
        for conf_file in pkg_conffiles.split("\n"):
            if conf_file not in dpkg_status_conffiles \
               and os.path.exists(prefix + conf_file):
                logging.debug("found conffile %s in new pkg but on dpkg "
                              "status" % conf_file)
                pkg_md5sum = get_md5sum_for_file_in_deb(destFile, conf_file)
                with open(prefix + conf_file, 'rb') as fp:
                    if pkg_md5sum != apt_pkg.md5sum(fp):
                        return True
    return False


def dpkg_conffile_prompt():
    # type: () -> bool
    if "DPkg::Options" not in apt_pkg.config:
        return True
    options = apt_pkg.config.value_list("DPkg::Options")
    for option in options:
        option = option.strip()
        if option in ["--force-confold", "--force-confnew"]:
            return False
    return True


def rewind_cache(cache, pkgs_to_upgrade):
    # type: (apt.Cache, List[apt.Package]) -> None
    """ set the cache back to the state with packages_to_upgrade """
    cache.clear()
    for pkg2 in pkgs_to_upgrade:
        pkg2.mark_install(from_user=not pkg2.is_auto_installed)
    if cache.broken_count > 0:
        raise AssertionError("rewind_cache created a broken cache")


def host():
    # type: () -> str
    return socket.getfqdn()


# *sigh* textwrap is nice, but it breaks "linux-image" into two
# seperate lines
def wrap(t, width=70, subsequent_indent=""):
    # type: (str, int, str) -> str
    out = ""
    for s in t.split():
        if (len(out) - out.rfind("\n")) + len(s) > width:
            out += "\n" + subsequent_indent
        out += s + " "
    return out


def setup_apt_listchanges(conf="/etc/apt/listchanges.conf"):
    # type: (str) -> None
    """ deal with apt-listchanges """
    # apt-listchanges will always send a mail if there is a mail address
    # set in the config regardless of the frontend used, so set it to
    # mail if we have a sendmail and to none if not (as it appears to
    # not check if sendmail is there or not), debian bug #579733
    if os.path.exists(SENDMAIL_BINARY):
        os.environ["APT_LISTCHANGES_FRONTEND"] = "mail"
    else:
        os.environ["APT_LISTCHANGES_FRONTEND"] = "none"


def _send_mail_using_mailx(from_address, to_address, subject, body):
    # type: (str, str, str, str) -> int
    # ensure that the body is a byte stream and that we do not
    # break on encoding errors (the default error mode is "strict")
    encoded_body = body.encode(
        locale.getpreferredencoding(False), errors="replace")
    # we use a binary pipe to stdin to ensure we do not break on
    # unicode encoding errors (e.g. because the user is running a
    # ascii only system like the buildds)
    mail = subprocess.Popen(
        [MAIL_BINARY, "-r", from_address, "-s", subject, to_address],
        stdin=subprocess.PIPE, universal_newlines=False)
    mail.stdin.write(encoded_body)
    mail.stdin.close()
    ret = mail.wait()
    return ret


def _send_mail_using_sendmail(from_address, to_address, subject, body):
    # type: (str, str, str, str) -> int
    # format as a proper mail
    msg = Message()
    msg['Subject'] = subject
    msg['From'] = from_address
    msg['To'] = to_address
    msg['Auto-Submitted'] = "auto-generated"
    # order is important here, Message() first, then Charset()
    #  then msg.set_charset()
    charset = email.charset.Charset("utf-8")
    charset.body_encoding = email.charset.QP  # type: ignore
    msg.set_payload(body, charset)
    # and send it away
    sendmail = subprocess.Popen(
        [SENDMAIL_BINARY, "-oi", "-t"],
        stdin=subprocess.PIPE, universal_newlines=True)
    sendmail.stdin.write(msg.as_string())
    sendmail.stdin.close()
    ret = sendmail.wait()
    return ret


def send_summary_mail(pkgs,                 # type: List[str]
                      res,                  # type: bool
                      result_str,           # type: str
                      pkgs_kept_back,       # type: List[str]
                      pkgs_removed,         # type: List[str]
                      pkgs_kept_installed,  # type: List[str]
                      mem_log,              # type: StringIO
                      dpkg_log_content,     # type: str
                      ):
    # type: (...) -> None
    """ send mail (if configured in Unattended-Upgrade::Mail) """
    to_email = apt_pkg.config.find("Unattended-Upgrade::Mail", "")
    if not to_email:
        return
    if not os.path.exists(MAIL_BINARY) and not os.path.exists(SENDMAIL_BINARY):
        logging.error(_("No /usr/bin/mail or /usr/sbin/sendmail,"
                        "can not send mail. "
                        "You probably want to install the mailx package."))
        return
    # if the operation was successful and the user has requested to get
    # mails on on errors, just exit here
    if (res and apt_pkg.config.find_b(
            "Unattended-Upgrade::MailOnlyOnError", False)):
        return
    # if the run was successful but nothing had to be done skip sending email
    if (res and not pkgs and not pkgs_kept_back and not pkgs_removed):
        return
    # Check if reboot-required flag is present
    reboot_flag_str = _(
        "[reboot required]") if os.path.isfile(REBOOT_REQUIRED_FILE) else ""
    # Check if packages are kept on hold
    hold_flag_str = (_("[package on hold]") if pkgs_kept_back
                     or pkgs_kept_installed else "")
    logging.debug("Sending mail to %s" % to_email)
    subject = _(
        "{hold_flag}{reboot_flag} unattended-upgrades result for "
        "{machine}: {result}").format(
            hold_flag=hold_flag_str, reboot_flag=reboot_flag_str,
            machine=host(), result="SUCCESS" if res else "FAILURE").strip()
    body = wrap(_("Unattended upgrade result: %s") % result_str, 70, " ")
    body += "\n\n"
    if os.path.isfile(REBOOT_REQUIRED_FILE):
        body += _(
            "Warning: A reboot is required to complete this upgrade, "
            "or a previous one.\n\n")
    if pkgs:
        if res:
            body += _("Packages that were upgraded:\n")
        else:
            body += _("Packages that attempted to upgrade:\n")
        body += " " + wrap(" ".join(pkgs), 70, " ")
        body += "\n\n"
    if pkgs_kept_back:
        body += _("Packages with upgradable origin but kept back:\n")
        body += " " + wrap(" ".join(pkgs_kept_back), 70, " ")
        body += "\n\n"
    if pkgs_removed:
        body += _("Packages that were auto-removed:\n")
        body += " " + wrap(" ".join(pkgs_removed), 70, " ")
        body += "\n\n"
    if pkgs_kept_installed:
        body += _("Packages that were kept from being auto-removed:\n")
        body += " " + wrap(" ".join(pkgs_kept_installed), 70, " ")
        body += "\n\n"
    if dpkg_log_content:
        body += _("Package installation log:") + "\n"
        body += dpkg_log_content
        body += "\n\n"
    body += _("Unattended-upgrades log:\n")
    body += mem_log.getvalue()

    from_email = apt_pkg.config.find("Unattended-Upgrade::Sender", "root")

    if os.path.exists(SENDMAIL_BINARY):
        ret = _send_mail_using_sendmail(from_email, to_email, subject, body)
    elif os.path.exists(MAIL_BINARY):
        ret = _send_mail_using_mailx(from_email, to_email, subject, body)
    else:
        raise AssertionError(
            "This should never be reached, if we are here we either "
            "have sendmail or mailx. Please report this as a bug.")
    logging.debug("mail returned: %s", ret)


def do_install(cache,             # type: apt.Cache
               pkgs_to_upgrade,   # type: List[apt.Package]
               blacklist,         # type: List[str]
               whitelist,         # type: List[str]
               options,           # type: Options
               logfile_dpkg,      # type: str
               ):
    # type: (...) -> bool

    setup_apt_listchanges()

    logging.info(_("Writing dpkg log to %s"), logfile_dpkg)

    marked_delete = [pkg for pkg in cache.get_changes() if pkg.marked_delete]
    if marked_delete:
        raise AssertionError(
            "Internal error. The following packages are marked for "
            "removal:%s" % " ".join([pkg.name for pkg in marked_delete]))

    pkg_install_success = False
    try:
        if options.minimal_upgrade_steps:
            # try upgrade all "pkgs" in minimal steps
            pkg_install_success = upgrade_in_minimal_steps(
                cache, [pkg.name for pkg in pkgs_to_upgrade],
                blacklist, whitelist, logfile_dpkg,
                options.verbose or options.debug)
        else:
            mark_pkgs_to_upgrade(cache, pkgs_to_upgrade)
            pkg_install_success = upgrade_normal(
                cache, logfile_dpkg, options.verbose or options.debug)
    except Exception as e:
        # print unhandled exceptions here this way, while stderr is redirected
        os.write(2, ("Exception: %s\n" % e).encode('utf-8'))
        pkg_install_success = False

    return pkg_install_success


def _setup_alternative_rootdir(rootdir):
    # type: (str) -> None
    # clear system unattended-upgrade stuff
    apt_pkg.config.clear("Unattended-Upgrade")
    # read rootdir (taken from apt.Cache, but we need to run it
    # here before the cache gets initialized
    if os.path.exists(rootdir + "/etc/apt/apt.conf"):
        apt_pkg.read_config_file(apt_pkg.config,
                                 rootdir + "/etc/apt/apt.conf")
    if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"):
        apt_pkg.read_config_dir(apt_pkg.config,
                                rootdir + "/etc/apt/apt.conf.d")
    logdir = os.path.join(rootdir, "var", "log", "unattended-upgrades")
    if not os.path.exists(logdir):
        os.makedirs(logdir)
    apt.apt_pkg.config.set("Unattended-Upgrade::LogDir", logdir)


def _get_logdir():
    # type: () -> str
    logdir = apt_pkg.config.find_dir(
        "Unattended-Upgrade::LogDir",
        # COMPAT only
        apt_pkg.config.find_dir("APT::UnattendedUpgrades::LogDir",
                                "/var/log/unattended-upgrades/"))
    return logdir


def _setup_logging(options):
    # type: (Options) -> StringIO

    # ensure this is run only once
    if len(logging.root.handlers) > 0:
        return None

    # init the logging
    logdir = _get_logdir()
    logfile = os.path.join(
        logdir,
        apt_pkg.config.find(
            "Unattended-Upgrade::LogFile",
            # COMPAT only
            apt_pkg.config.find("APT::UnattendedUpgrades::LogFile",
                                "unattended-upgrades.log")))
    if not options.dry_run and not os.path.exists(logdir):
        os.makedirs(logdir)

    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(levelname)s %(message)s',
                        filename=logfile)
    # additional logging
    logger = logging.getLogger()
    mem_log = StringIO()
    if options.apt_debug:
        apt_pkg.config.set("Debug::pkgProblemResolver", "1")
        apt_pkg.config.set("Debug::pkgDepCache::AutoInstall", "1")
    if options.debug:
        logger.setLevel(logging.DEBUG)
        stdout_handler = logging.StreamHandler(sys.stdout)
        logger.addHandler(stdout_handler)
    elif options.verbose:
        logger.setLevel(logging.INFO)
        stdout_handler = logging.StreamHandler(sys.stdout)
        logger.addHandler(stdout_handler)
    if apt_pkg.config.find("Unattended-Upgrade::Mail", ""):
        mem_log_handler = logging.StreamHandler(mem_log)
        logger.addHandler(mem_log_handler)
    # Configure syslog if necessary
    syslogEnable = apt_pkg.config.find_b("Unattended-Upgrade::SyslogEnable",
                                         False)
    if syslogEnable:
        syslogFacility = apt_pkg.config.find(
            "Unattended-Upgrade::SyslogFacility",
            "daemon")
        syslogHandler = logging.handlers.SysLogHandler(
            address='/dev/log',
            facility=syslogFacility)
        syslogHandler.setFormatter(
            logging.Formatter("unattended-upgrade: %(message)s"))
        known = syslogHandler.facility_names.keys()  # type: ignore
        if syslogFacility.lower() in known:
            logger.addHandler(syslogHandler)
            logging.info("Enabled logging to syslog via %s facility "
                         % syslogFacility)
        else:
            logging.warning("Syslog facility %s was not found"
                            % syslogFacility)
    return mem_log


def get_blacklist():
    # type: () -> List[str]
    return apt_pkg.config.value_list("Unattended-Upgrade::Package-Blacklist")


def get_whitelist():
    # type: () -> List[str]
    return apt_pkg.config.value_list("Unattended-Upgrade::Package-Whitelist")


def logged_in_users():
    # type: () -> AbstractSet[str]
    """Return a list of logged in users"""
    # the "users" command always returns a single line with:
    # "user1, user1, user2"
    users = subprocess.check_output(
        USERS, universal_newlines=True).rstrip('\n')
    return set(users.split())


def reboot_if_requested_and_needed():
    # type: () -> None
    """auto-reboot (if required and the config for this is set)"""
    if not os.path.exists(REBOOT_REQUIRED_FILE):
        return
    if not apt_pkg.config.find_b(
            "Unattended-Upgrade::Automatic-Reboot", False):
        return
    # see if we need to check for logged in users
    if not apt_pkg.config.find_b(
            "Unattended-Upgrade::Automatic-Reboot-WithUsers", True):
        users = logged_in_users()
        if users:
            msg = gettext.ngettext(
                "Found %s, but not rebooting because %s is logged in." % (
                    REBOOT_REQUIRED_FILE, users),
                "Found %s, but not rebooting because %s are logged in." % (
                    REBOOT_REQUIRED_FILE, users),
                len(users))
            logging.warning(msg)
            return
    # reboot at the specified time
    when = apt_pkg.config.find(
        "Unattended-Upgrade::Automatic-Reboot-Time", "now")
    logging.warning("Found %s, rebooting" % REBOOT_REQUIRED_FILE)
    cmd = ["/sbin/shutdown", "-r", when]
    try:
        shutdown_msg = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        if shutdown_msg.strip():
            logging.warning("Shutdown msg: %s", shutdown_msg.strip())
    except Exception as e:
        logging.error("Failed to issue shutdown: %s", e)


def write_stamp_file():
    # type: () -> None
    statedir = os.path.join(apt_pkg.config.find_dir("Dir::State"), "periodic")
    if not os.path.exists(statedir):
        os.makedirs(statedir)
    with open(os.path.join(statedir, "unattended-upgrades-stamp"), "w"):
        pass


def try_to_upgrade(pkg,               # type: apt.Package
                   pkgs_to_upgrade,   # type: List[apt.Package]
                   pkgs_kept_back,    # type: List[str]
                   cache,             # type: apt.Cache
                   allowed_origins,   # type: List[str]
                   blacklist,         # type: List[str]
                   whitelist,         # type: List[str]
                   ):
    # type: (...) -> None
    try:
        try:
            # try to adjust pkg itself first, if that throws an exception it
            # can't be upgraded on its own
            cache.adjust_candidate(pkg)
            if not pkg.is_upgradable:
                return
        except NoAllowedOriginError:
            return
        cache._cached_candidate_pkgnames.add(pkg.name)
        cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed)
        if check_changes_for_sanity(cache, allowed_origins, blacklist,
                                    whitelist, pkg):
            # add to packages to upgrade
            pkgs_to_upgrade.append(pkg)
            # re-eval pkgs_kept_back as the resolver may fail to
            # directly upgrade a pkg, but that may work during
            # a subsequent operation, see debian bug #639840
            for pkgname in pkgs_kept_back:
                if cache[pkgname].marked_install \
                   or cache[pkgname].marked_upgrade:
                    pkgs_kept_back.remove(pkgname)
                    pkgs_to_upgrade.append(cache[pkgname])
        else:
            logging.debug("sanity check failed")
            rewind_cache(cache, pkgs_to_upgrade)
            pkgs_kept_back.append(pkg.name)
    except (SystemError, NoAllowedOriginError) as e:
        # can't upgrade
        logging.warning(
            _("package %s upgradable but fails to "
                "be marked for upgrade (%s)"), pkg.name, e)
        rewind_cache(cache, pkgs_to_upgrade)
        pkgs_kept_back.append(pkg.name)


def calculate_upgradable_pkgs(cache,             # type: apt.Cache
                              options,           # type: Options
                              allowed_origins,   # type: List[str]
                              blacklist,         # type: List[str]
                              whitelist,         # type: List[str]
                              ):
    # type: (...) -> Tuple[List[apt.Package], List[str]]
    pkgs_to_upgrade = []  # type: List[apt.Package]
    pkgs_kept_back = []   # type: List[str]

    # now do the actual upgrade
    for pkg in cache:
        if options.debug and pkg.is_upgradable:
            logging.debug("Checking: %s (%s)" % (
                pkg.name, getattr(pkg.candidate, "origins", [])))

        if (pkg.is_upgradable
                and is_pkgname_in_whitelist(pkg.name, whitelist)):
            try:
                ver_in_allowed_origin(pkg, allowed_origins)
            except NoAllowedOriginError:
                continue

            try_to_upgrade(pkg,
                           pkgs_to_upgrade,
                           pkgs_kept_back,
                           cache,
                           allowed_origins,
                           blacklist,
                           whitelist)

    if cache.get_changes():
        cache.clear()

    return pkgs_to_upgrade, pkgs_kept_back


def get_dpkg_log_content(logfile_dpkg, install_start_time):
    # type: (str, datetime.datetime) -> str
    logging.debug("Extracting content from %s since %s" % (
        logfile_dpkg, install_start_time))
    content = []
    found_start = False
    try:
        with io.open(logfile_dpkg, encoding='utf-8', errors='replace') as fp:
            # read until we find the last "Log started: "
            for line in fp.readlines():
                # scan for the first entry we need (minimal-step mode
                # creates a new stanza for each individual install)
                if not found_start and line.startswith("Log started: "):
                    stanza_start = LoggingDateTime.from_string(
                        line[len("Log started: "):-1])
                    if stanza_start >= install_start_time:
                        found_start = True
                if found_start:
                    # skip progress indicator until #860931 is fixed in apt
                    # and dpkg
                    if re.match(
                            "^\\(Reading database \\.\\.\\. ()|([0-9]+%)$",
                            line):
                        continue
                    content.append(line)
        return "".join(content)
    except FileNotFoundError:
        return ""


def get_auto_removable(cache):
    # type: (apt.Cache) -> AbstractSet[str]
    return {pkg.name for pkg in cache
            if pkg.is_auto_removable}


def is_autoremove_valid(cache, pkgname, auto_removable, blacklist, whitelist):
    # type: (apt.Cache, str, AbstractSet[str], List[str], List[str]) -> bool
    changes = cache.get_changes()
    if not changes:
        # package is already removed
        return True
    pkgnames = {pkg.name for pkg in changes}
    for pkg in changes:
        if not is_pkg_change_allowed(pkg, blacklist, whitelist):
            logging.warning(
                _("Keeping the following auto-removable package(s) because "
                  "they include %s which is set to be kept unmodified: %s"),
                pkg.name, " ".join(sorted(pkgnames)))
            return False
    if not pkgnames.issubset(auto_removable):
        if pkgname != "":
            logging.warning(
                _("Keeping auto-removable %s package(s) because it would"
                  " also remove the following packages which should "
                  "be kept in this step: %s"), pkgname,
                " ".join(sorted(pkgnames - auto_removable)))
        else:
            logging.warning(
                _("Keeping %s auto-removable package(s) because it would"
                  " also remove the following packages which should "
                  "be kept in this step: %s"), len(auto_removable),
                " ".join(sorted(pkgnames - auto_removable)))

        return False
    for packagename in pkgnames:
        if cache.running_kernel_pkgs_regexp and \
           cache.running_kernel_pkgs_regexp.match(packagename):
            logging.warning(
                _("Keeping the following auto-removable package(s) because "
                  "they include %s which package is related to the running "
                  "kernel: %s"), packagename, " ".join(sorted(pkgnames)))
            return False
    return True


def do_auto_remove(cache,             # type: apt.Cache
                   auto_removable,    # type: AbstractSet[str]
                   logfile_dpkg,      # type: str
                   minimal_steps,     # type: bool
                   blacklist,         # type: List[str]
                   whitelist,         # type: List[str]
                   verbose=False,     # type: bool
                   dry_run=False      # type: bool
                   ):
    # type: (...) -> Tuple[bool, List[str], List[str]]
    res = True
    if not auto_removable:
        return (res, [], [])

    pkgs_removed = []         # type: List[str]
    pkgs_kept_installed = []  # type: List[str]
    if minimal_steps:
        for pkgname in auto_removable:
            if should_stop():
                pkgs_kept_installed = list(auto_removable - set(pkgs_removed))
                return (False, pkgs_removed, pkgs_kept_installed)
            logging.debug("marking %s for removal" % pkgname)
            if pkgname in pkgs_removed:
                continue
            cache[pkgname].mark_delete()
            if not is_autoremove_valid(cache, pkgname, auto_removable,
                                       blacklist, whitelist):
                # this situation can occur when removing newly unused packages
                # would also remove old unused packages which are not set
                # for removal, thus getting there is not handled as an error
                pkgs_kept_installed.append(pkgname)
                cache.clear()
                continue
            if not dry_run:
                changes = cache.get_changes()
                pkgnames = {pkg.name for pkg in changes}
                res, error = cache_commit(cache, logfile_dpkg, verbose)
                if not res:
                    break
                pkgs_removed.extend(pkgnames)
            else:
                cache.clear()
    else:
        for pkgname in auto_removable:
            cache[pkgname].mark_delete()
        if is_autoremove_valid(cache, "", auto_removable, blacklist,
                               whitelist):
            # do it in one step
            if not dry_run:
                res, error = cache_commit(cache, logfile_dpkg, verbose)
            else:
                cache.clear()
        else:
            cache.clear()

    if res:
        logging.info(_("Packages that were successfully auto-removed: %s"),
                     " ".join(sorted(pkgs_removed)))
        logging.info(_("Packages that are kept back: %s"),
                     " ".join(sorted(pkgs_kept_installed)))
    if not res:
        cache.clear()
        logging.error(_("Auto-removing the packages failed!"))
        logging.error(_("Error message: %s"), error)
        logging.error(_("dpkg returned an error! See %s for details"),
                      logfile_dpkg)
    return (res, pkgs_removed, pkgs_kept_installed)


def clean_downloaded_packages(fetcher):
    # type: (apt_pkg.Acquire) -> None
    archivedir = os.path.dirname(
        apt_pkg.config.find_dir("Dir::Cache::archives"))
    for item in fetcher.items:
        if os.path.dirname(os.path.abspath(item.destfile)) == archivedir:
            try:
                os.unlink(item.destfile)
            except OSError:
                pass


def is_update_day():
    # type: () -> bool
    # check if patch days are configured
    patch_days = apt_pkg.config.value_list("Unattended-Upgrade::Update-Days")
    if not patch_days:
        return True
    # validate patch days
    today = date.today()
    # abbreviated localized dayname
    if today.strftime("%a") in patch_days:
        return True
    # full localized dayname
    if today.strftime("%A") in patch_days:
        return True
    # by number (Sun: 0, Mon: 1, ...)
    if today.strftime("%w") in patch_days:
        return True
    # today is not a patch day
    logging.info(
        "Skipping update check: today is %s,%s,%s but patch days are %s",
        today.strftime("%w"), today.strftime("%a"), today.strftime("%A"),
        ", ".join(patch_days))
    return False


def main(options, rootdir=""):
    # type: (Options, str) -> int
    # useful for testing
    if rootdir:
        _setup_alternative_rootdir(rootdir)

    # see debian #776752
    install_start_time = datetime.datetime.now().replace(microsecond=0)

    # setup logging
    mem_log = _setup_logging(options)
    # get log
    logfile_dpkg = os.path.join(_get_logdir(), 'unattended-upgrades-dpkg.log')
    if not os.path.exists(logfile_dpkg):
        with open(logfile_dpkg, 'w'):
            pass

    # lock for the shutdown check
    shutdown_lock = apt_pkg.get_lock(LOCK_FILE)
    if shutdown_lock < 0:
        logging.error("Lock file is already taken, exiting")
        return 1

    try:
        res = run(options, rootdir, mem_log, logfile_dpkg,
                  install_start_time)
        if res.result_str and not options.dry_run:
            # there is some meaningful result which is worth an email
            log_content = get_dpkg_log_content(logfile_dpkg,
                                               install_start_time)
            send_summary_mail(res.pkgs, res.success, res.result_str,
                              res.pkgs_kept_back, res.pkgs_removed,
                              res.pkgs_kept_installed, mem_log,
                              log_content)
        if res.update_stamp:
            # write timestamp file
            write_stamp_file()
            if not options.dry_run:
                # check if the user wants a reboot
                reboot_if_requested_and_needed()
        os.close(shutdown_lock)
        if res.success:
            return 0
        else:
            return 1

    except Exception as e:
        logger = logging.getLogger()
        logger.exception(_("An error occurred: %s"), e)
        log_content = get_dpkg_log_content(logfile_dpkg,
                                           install_start_time)
        if not options.dry_run:
            send_summary_mail(["<unknown>"], False, _("An error occurred"), [],
                              [], [],
                              mem_log, log_content)
        # Re-raise exceptions for apport
        raise


def mark_pkgs_to_upgrade(cache, pkgs_to_upgrade):
    # type (apt.Cache, List[apt.Package]) -> None
    for pkg in pkgs_to_upgrade:
        if pkg.is_upgradable:
            cache.mark_upgrade_adjusted(pkg,
                                        from_user=not pkg.is_auto_installed)
        elif not pkg.is_installed:
            cache.mark_install_adjusted(pkg, from_user=False)


def run(options,             # type: Options
        rootdir,             # type: str
        mem_log,             # type: StringIO
        logfile_dpkg,        # type: str
        install_start_time,  # type: datetime.datetime
        ):
    # type: (...) -> UnattendedUpgradesResult

    # check if today is a patch day
    if not is_update_day():
        return UnattendedUpgradesResult(True)

    # check if u-u should be stopped already
    if should_stop():
        return UnattendedUpgradesResult(False)

    # check to see if want to auto-upgrade the devel release
    if apt_pkg.config.find("Unattended-Upgrade::DevRelease") == "auto":
        try:
            if DISTRO_ID.lower() == 'ubuntu':
                devel = (distro_info.UbuntuDistroInfo() .
                         devel(result="object"))
            elif DISTRO_ID.lower() == 'debian':
                devel = (distro_info.DebianDistroInfo() .
                         devel(result="object"))
            else:
                devel = (distro_info.DistroInfo(DISTRO_ID) .
                         devel(result="object"))
        except Exception as e:
            logging.warning("Could not figure out development release: %s" % e)
        else:
            if ((devel.series == DISTRO_CODENAME
                 and devel.release is not None
                 and devel.release - date.today() > DEVEL_UNTIL_RELEASE)):
                syslog.syslog((_("Not running on this development "
                                 "release before %s") %
                              (devel.release - DEVEL_UNTIL_RELEASE
                               - datetime.timedelta(days=1))))
                logging.warning(_("Not running on this development "
                                  "release before %s") %
                                (devel.release - DEVEL_UNTIL_RELEASE
                                 - datetime.timedelta(days=1)))
                return UnattendedUpgradesResult(True)

            logging.debug("Running on the development release")
    elif "(development branch)" in DISTRO_DESC and not\
            apt_pkg.config.find_b("Unattended-Upgrade::DevRelease", True):
        syslog.syslog(_("Not running on the development release."))
        logging.info(_("Not running on the development release."))
        return UnattendedUpgradesResult(True)
    # format (origin, archive), e.g. ("Ubuntu","dapper-security")
    allowed_origins = get_allowed_origins()

    # pkgs that are (for some reason) not safe to install
    blacklist = get_blacklist()
    logging.info(_("Initial blacklist : %s"),
                 " ".join(blacklist))

    # install only these packages regardless of other upgrades available
    whitelist = get_whitelist()
    logging.info(_("Initial whitelist: %s"),
                 " ".join(whitelist))

    logging.info(_("Starting unattended upgrades script"))

    # display available origin
    logging.info(_("Allowed origins are: %s"), ", ".join(allowed_origins))

    # check and get lock
    try:
        apt_pkg.pkgsystem_lock()
    except SystemError:
        logging.error(_("Lock could not be acquired (another package "
                        "manager running?)"))
        print(_("Cache lock can not be acquired, exiting"))
        return UnattendedUpgradesResult(
            False, _("Lock could not be acquired"))

    # check if the journal is dirty and if so, take emergceny action
    # the alternative is to leave the system potentially unsecure until
    # the user comes in and fixes
    if is_dpkg_journal_dirty() and \
       apt_pkg.config.find_b("Unattended-Upgrade::AutoFixInterruptedDpkg",
                             True):
        logging.warning(
            _("Unclean dpkg state detected, trying to correct"))
        print(_("Unclean dpkg state detected, trying to correct"))
        env = copy.copy(os.environ)
        env["DPKG_FRONTEND_LOCKED"] = "1"
        try:
            with Unlocked():
                output = subprocess.check_output(
                    ["dpkg", "--force-confold", "--configure", "-a"],
                    env=env,
                    universal_newlines=True)
        except subprocess.CalledProcessError as e:
            output = e.output
        logging.warning(_("dpkg --configure -a output:\n%s"), output)

    # get a cache
    try:
        cache = UnattendedUpgradesCache(rootdir=rootdir,
                                        allowed_origins=allowed_origins)
    except SystemError as error:
        print(_("Apt returned an error, exiting"))
        print(_("error message: %s") % error)
        logging.error(_("Apt returned an error, exiting"))
        logging.error(_("error message: %s"), error)
        return UnattendedUpgradesResult(
            False, _("Apt returned an error, exiting"))

    if cache._depcache.broken_count > 0:
        print(_("Cache has broken packages, exiting"))
        logging.error(_("Cache has broken packages, exiting"))
        return UnattendedUpgradesResult(
            False, _("Cache has broken packages, exiting"))

    # FIXME: make this into a ContextManager
    # be nice when calculating the upgrade as its pretty CPU intensive
    old_priority = os.nice(0)
    try:
        # Check that we will be able to restore the priority
        os.nice(-1)
        os.nice(20)
    except OSError as e:
        if e.errno in (errno.EPERM, errno.EACCES):
            pass
        else:
            raise

    auto_removable = get_auto_removable(cache)

    # find out about the packages that are upgradable (in an allowed_origin)
    pkgs_to_upgrade, pkgs_kept_back = calculate_upgradable_pkgs(
        cache, options, allowed_origins, blacklist, whitelist)
    pkgs_to_upgrade.sort(key=lambda p: p.name)
    pkgs = [pkg.name for pkg in pkgs_to_upgrade]
    logging.debug("pkgs that look like they should be upgraded: %s"
                  % "\n".join(pkgs))

    # FIXME: make this into a ContextManager
    # stop being nice
    os.nice(old_priority - os.nice(0))

    # download what looks good
    mark_pkgs_to_upgrade(cache, pkgs_to_upgrade)

    if options.debug:
        fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress())
    else:
        fetcher = apt_pkg.Acquire()
    list = apt_pkg.SourceList()
    list.read_main_list()
    recs = cache._records
    pm = apt_pkg.PackageManager(cache._depcache)
    # don't start downloading during shutdown
    # TODO: download files one by one and check for stop request after each of
    # them
    if should_stop():
        return UnattendedUpgradesResult(False, _("Upgrade was interrupted"))
    try:
        pm.get_archives(fetcher, list, recs)
    except SystemError as e:
        logging.error(_("GetArchives() failed: %s"), e)
    try:
        res = fetcher.run()
        logging.debug("fetch.run() result: %s", res)
    except SystemError as e:
        logging.error("fetch.run() result: %s", e)

    if cache.get_changes():
        cache.clear()

    if options.download_only:
        return UnattendedUpgradesResult(True)

    pkg_conffile_prompt = False
    if dpkg_conffile_prompt():
        # now check the downloaded debs for conffile conflicts and build
        # a blacklist
        for item in fetcher.items:
            logging.debug("%s" % item)
            if item.status == item.STAT_ERROR:
                print(_("An error occurred: %s") % item.error_text)
                logging.error(_("An error occurred: %s"), item.error_text)
            if not item.complete:
                print(_("The URI %s failed to download, aborting") %
                      item.desc_uri)
                logging.error(_("The URI %s failed to download, aborting"),
                              item.desc_uri)
                return UnattendedUpgradesResult(
                    False, (_("The URI %s failed to download, aborting") %
                            item.desc_uri))
            if not os.path.exists(item.destfile):
                print(_("Download finished, but file %s not there?!?") %
                      item.destfile)
                logging.error("Download finished, but file %s not "
                              "there?!?", item.destfile)
                return UnattendedUpgradesResult(
                    False, (_("Download finished, but file %s not there?!?") %
                            item.destfile))
            if not item.is_trusted and not apt_pkg.config.find_b(
                    "APT::Get::AllowUnauthenticated", False):
                logging.debug("%s is blacklisted because it is not trusted")
                pkg_name = pkgname_from_deb(item.destfile)
                if not is_pkgname_in_blacklist(pkg_name, blacklist):
                    blacklist.append("%s$" % re.escape(pkg_name))
            if not is_deb(item.destfile):
                logging.debug("%s is not a .deb file" % item)
                continue
            if conffile_prompt(item.destfile):
                # skip package (means to re-run the whole marking again
                # and making sure that the package will not be pulled in by
                # some other package again!)
                #
                # print to stdout to ensure that this message is part of
                # the cron mail (only if no summary mail is requested)
                email = apt_pkg.config.find("Unattended-Upgrade::Mail", "")
                if not email:
                    print(_("Package %s has conffile prompt and needs "
                            "to be upgraded manually") %
                          pkgname_from_deb(item.destfile))
                # log to the logfile
                logging.warning(_("Package %s has conffile prompt and "
                                  "needs to be upgraded manually"),
                                pkgname_from_deb(item.destfile))
                pkg_name = pkgname_from_deb(item.destfile)
                if not is_pkgname_in_blacklist(pkg_name, blacklist):
                    blacklist.append("%s$" % re.escape(pkg_name))
                pkgs_kept_back.append(pkgname_from_deb(pkg_name))
                pkg_conffile_prompt = True

        # redo the selection about the packages to upgrade based on the new
        # blacklist
        logging.debug("blacklist: %s" % blacklist)
        # whitelist
        logging.debug("whitelist: %s" % whitelist)
        # find out about the packages that are upgradable (in a allowed_origin)
        if len(blacklist) > 0 or len(whitelist) > 0:
            old_pkgs_to_upgrade = pkgs_to_upgrade[:]
            pkgs_to_upgrade = []
            for pkg in old_pkgs_to_upgrade:
                logging.debug("Checking the black and whitelist: %s" %
                              (pkg.name))
                cache.mark_upgrade_adjusted(
                    pkg, from_user=not pkg.is_auto_installed)
                if check_changes_for_sanity(cache,
                                            allowed_origins,
                                            blacklist,
                                            whitelist):
                    pkgs_to_upgrade.append(pkg)
                else:
                    if not (pkg.name in pkgs_kept_back):
                        pkgs_kept_back.append(pkg.name)
                    logging.info(_("package %s not upgraded"), pkg.name)
                    cache.clear()
                    for pkg2 in pkgs_to_upgrade:
                        cache.call_adjusted(
                            apt.package.Package.mark_upgrade, pkg2,
                            from_user=not pkg2.is_auto_installed)
            if cache.get_changes():
                cache.clear()

    else:
        logging.debug("dpkg is configured not to cause conffile prompts")

    # auto-removals
    kernel_pkgs_remove_success = True  # type: bool
    kernel_pkgs_removed = []           # type: List[str]
    kernel_pkgs_kept_installed = []    # type: List[str]
    if (auto_removable and apt_pkg.config.find_b(
            "Unattended-Upgrade::Remove-Unused-Kernel-Packages", True)):
        # remove unused kernels before installing new ones because the newly
        # installed ones may fill up /boot and break the system right before
        # removing old ones could take place
        #
        # this step may also remove _auto-removable_ reverse dependencies
        # of kernel packages
        auto_removable_kernel_pkgs = {
            p for p in auto_removable
            if (cache.versioned_kernel_pkgs_regexp
                and cache.versioned_kernel_pkgs_regexp.match(p)
                and not cache.running_kernel_pkgs_regexp.match(p))}
        if auto_removable_kernel_pkgs:
            logging.info(_("Removing unused kernel packages: %s"),
                         " ".join(auto_removable_kernel_pkgs))
            (kernel_pkgs_remove_success,
             kernel_pkgs_removed,
             kernel_pkgs_kept_installed) = do_auto_remove(
                cache, auto_removable_kernel_pkgs, logfile_dpkg,
                options.minimal_upgrade_steps, blacklist, whitelist,
                options.verbose or options.debug, options.dry_run)
            auto_removable = get_auto_removable(cache)

    previous_autoremovals = auto_removable
    if apt_pkg.config.find_b(
            "Unattended-Upgrade::Remove-Unused-Dependencies", False):
        pending_autoremovals = previous_autoremovals
    else:
        pending_autoremovals = set()

    # exit if there is nothing to do and nothing to report
    if (len(pending_autoremovals) == 0
            and len(pkgs_to_upgrade) == 0
            and len(pkgs_kept_back) == 0):
        logging.info(_("No packages found that can be upgraded unattended "
                       "and no pending auto-removals"))
        return UnattendedUpgradesResult(
            kernel_pkgs_remove_success,
            _("No packages found that can be upgraded unattended and no "
              "pending auto-removals"),
            pkgs_removed=kernel_pkgs_removed,
            pkgs_kept_installed=kernel_pkgs_kept_installed,
            update_stamp=True)

    # check if its configured for install on shutdown, if so, the
    # environment UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN will
    # be set by the unatteded-upgrades-shutdown script
    if ("UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN" not in os.environ
        and apt_pkg.config.find_b(
            "Unattended-Upgrade::InstallOnShutdown", False)):
        logger = logging.getLogger()
        logger.debug("Configured to install on shutdown, so exiting now")
        return UnattendedUpgradesResult(True)

    # check if we are in dry-run mode
    if options.dry_run:
        logging.info("Option --dry-run given, *not* performing real actions")
        apt_pkg.config.set("Debug::pkgDPkgPM", "1")

    # do the install based on the new list of pkgs
    pkgs = [pkg.name for pkg in pkgs_to_upgrade]
    logging.info(_("Packages that will be upgraded: %s"), " ".join(pkgs))

    # only perform install step if we actually have packages to install
    pkg_install_success = True
    if len(pkgs_to_upgrade) > 0:
        # do install
        pkg_install_success = do_install(cache,
                                         pkgs_to_upgrade,
                                         blacklist,
                                         whitelist,
                                         options,
                                         logfile_dpkg)
    # Was the overall run succesful: only if everything installed
    # fine and nothing was held back because of a conffile prompt.
    successful_run = (kernel_pkgs_remove_success and pkg_install_success
                      and not pkg_conffile_prompt)

    # now check if any auto-removing needs to be done
    if cache._depcache.broken_count > 0:
        print(_("Cache has broken packages, exiting"))
        logging.error(_("Cache has broken packages, exiting"))
        return UnattendedUpgradesResult(
            False, _("Cache has broken packages, exiting"), pkgs=pkgs)

    # the user wants *all* auto-removals to be removed
    # (unless u-u got signalled to stop gracefully quickly)
    pkgs_removed = []         # type: List[str]
    pkgs_kept_installed = []  # type: List[str]
    if ((apt_pkg.config.find_b(
            "Unattended-Upgrade::Remove-Unused-Dependencies", False)
         and not SIGNAL_STOP_REQUEST)):
        auto_removals = get_auto_removable(cache)
        (pkg_remove_success,
         pkgs_removed,
         pkgs_kept_installed) = do_auto_remove(
            cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps,
            blacklist, whitelist, options.verbose or options.debug,
            options.dry_run)
        successful_run = successful_run and pkg_remove_success
    # the user wants *only new* auto-removals to be removed
    elif apt_pkg.config.find_b(
            "Unattended-Upgrade::Remove-New-Unused-Dependencies", True):
        # calculate the new auto-removals
        new_pending_autoremovals = get_auto_removable(cache)
        auto_removals = new_pending_autoremovals - previous_autoremovals
        (pkg_remove_success,
         pkgs_removed,
         pkgs_kept_installed) = do_auto_remove(
            cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps,
            blacklist, whitelist, options.verbose or options.debug,
            options.dry_run)
        successful_run = successful_run and pkg_remove_success

    logging.debug("InstCount=%i DelCount=%i BrokenCount=%i"
                  % (cache._depcache.inst_count,
                     cache._depcache.del_count,
                     cache._depcache.broken_count))

    # clean after success install (if needed)
    keep_key = "Unattended-Upgrade::Keep-Debs-After-Install"
    if (not apt_pkg.config.find_b(keep_key, False)
            and not options.dry_run
            and pkg_install_success):
        clean_downloaded_packages(fetcher)

    return UnattendedUpgradesResult(
        successful_run, _("All upgrades installed"), pkgs,
        pkgs_kept_back,
        kernel_pkgs_removed + pkgs_removed,
        kernel_pkgs_kept_installed + pkgs_kept_installed,
        update_stamp=True)


class Options:
    def __init__(self):
        self.download_only = False
        self.dry_run = False
        self.debug = False
        self.apt_debug = False
        self.verbose = False
        self.minimal_upgrade_steps = False


if __name__ == "__main__":
    localesApp = "unattended-upgrades"
    localesDir = "/usr/share/locale"
    gettext.bindtextdomain(localesApp, localesDir)
    gettext.textdomain(localesApp)

    # set debconf to NON_INTERACTIVE
    os.environ["DEBIAN_FRONTEND"] = "noninteractive"

    # this ensures the commandline is logged in /var/log/apt/history.log
    apt_pkg.config.set("Commandline::AsString", " ".join(sys.argv))

    # COMPAT with the mispelling
    minimal_steps_default = (
        apt_pkg.config.find_b("Unattended-Upgrades::MinimalSteps", True)
        and apt_pkg.config.find_b("Unattended-Upgrade::MinimalSteps", True))

    # init the options
    parser = OptionParser()
    parser.add_option("-d", "--debug",
                      action="store_true",
                      default=apt_pkg.config.find_b(
                          "Unattended-Upgrade::Debug", False),
                      help=_("print debug messages"))
    parser.add_option("", "--apt-debug",
                      action="store_true", default=False,
                      help=_("make apt/libapt print verbose debug messages"))
    parser.add_option("-v", "--verbose",
                      action="store_true",
                      default=apt_pkg.config.find_b(
                          "Unattended-Upgrade::Verbose", False),
                      help=_("print info messages"))
    parser.add_option("", "--dry-run",
                      action="store_true", default=False,
                      help=_("Simulation, download but do not install"))
    parser.add_option("", "--download-only",
                      action="store_true", default=False,
                      help=_("Only download, do not even try to install."))
    parser.add_option("", "--minimal-upgrade-steps",
                      action="store_true", default=minimal_steps_default,
                      help=_("Upgrade in minimal steps (and allow "
                             "interrupting with SIGTERM) (default)"))
    parser.add_option("", "--no-minimal-upgrade-steps",
                      action="store_false", default=minimal_steps_default,
                      dest="minimal_upgrade_steps",
                      help=_("Upgrade in minimal steps (and allow "
                             "interrupting with SIGTERM"))
    parser.add_option("", "--minimal_upgrade_steps",
                      action="store_true",
                      help=SUPPRESS_HELP,
                      default=minimal_steps_default)
    options = cast(Options, (parser.parse_args())[0])

    if os.getuid() != 0:
        print(_("You need to be root to run this application"))
        sys.exit(1)

    # ensure that we are not killed when the terminal goes away e.g. on
    # shutdown
    signal.signal(signal.SIGHUP, signal.SIG_IGN)

    # setup signal handler for graceful stopping
    signal.signal(signal.SIGTERM, signal_handler)

    # write pid to let other processes find this one
    pidf = os.path.join(apt_pkg.config.find_dir("Dir"),
                        "var", "run", "unattended-upgrades.pid")
    # clean up pid file on exit
    with open(pidf, "w") as fp:
        fp.write("%s" % os.getpid())
    atexit.register(os.remove, pidf)

    # run the main code
    sys.exit(main(options))

Anon7 - 2022
AnonSec Team