# SPDX-FileCopyrightText: 2024 Blender Foundation
#
# SPDX-License-Identifier: GPL-2.0-or-later

# Schedule files for later removal, needed for situations where files are locked.
#
# This is mainly a workaround for WIN32 error where an add-on DLL
# is considered *used* making it impossible to remove.
#
# This is also used on other systems as permissions can also prevent sub-directories from removed.
# In this case renaming can make way the path to be replaced however it doesn't address
# the problem of the "stale" path failing to be removed.
# The user would need to change the permissions in this case (although this really a corner case).
__all__ = (
    "StaleFiles",
)


from collections.abc import (
    Sequence,
)

# The stale file-format is very simple and works as follows.
#
# - Every line references a path relative to the stale file.
# - Paths must always references files within this directory
#   (anything else must be ignored).
# - Paths must always use forward slashes (even on WIN32).
#   This is done since a repository may be accessed from different systems.
# - Paths must end with a newline `\n`.
#
# Further notes:
# - Corrupted "stale" files must be handled gracefully (it may be random bytes).
# - Non UTF8 characters in paths are supported via `surrogateescape`.
# - File names containing newlines are *not* supported.


class StaleFiles:
    __slots__ = (
        # Files outside of this directory must *never* be removed.
        "_base_directory",
        # The name (within `_base_directory`) to load/store paths.
        "_stale_filename",
        # Stale paths relative to `_base_directory`.
        "_paths",
        # When true, print extra debug output.
        "_debug",
        # Store the cache index per-directory, avoids looking up an index every time a stale name needs to be created.
        "_index_cache",
        # True when the run-time state is different to the on-disk state.
        "_is_modified",
    )

    def __init__(
            self,
            base_directory: str,
            *,
            stale_filename: str,
            debug: bool = False,
    ):
        import os
        from os import sep
        assert base_directory not in ("", ".", "..")
        # NOTE: on WIN32 `normpath` won't remove the trailing `sep`,
        # it's important to add only if it's not there.
        base_directory = os.path.normpath(base_directory)
        self._base_directory = base_directory if base_directory.endswith(sep) else (base_directory + sep)
        self._stale_filename = stale_filename
        self._paths: list[str] = []
        self._debug: bool = debug

        self._index_cache: dict[str, int] = {}
        self._is_modified: bool = True

    def is_empty(self) -> bool:
        return not bool(self._paths)

    def is_modified(self) -> bool:
        return self._is_modified

    def state_load(self, *, check_exists: bool) -> None:
        import contextlib
        import os
        from os import sep

        base_directory = self._base_directory
        paths = self._paths
        debug = self._debug

        assert base_directory.endswith(sep)
        # Don't support loading multiple times or running again after adding files.
        assert len(paths) == 0

        stale_filepath = os.path.join(base_directory, self._stale_filename)

        line_count = 0

        # Set here before early exit.
        # Assume modified so any corrupt causes a re-write.
        self._is_modified = True

        try:
            # pylint: disable-next=consider-using-with
            fh_context = open(stale_filepath, "r", encoding="utf8", errors="surrogateescape")
        except FileNotFoundError:
            self._is_modified = False
            return
        except Exception as ex:
            if debug:
                print(base_directory, "error opening file for read", str(ex))
            return

        with contextlib.closing(fh_context) as fh:
            fh_iter = iter(fh)
            while True:
                try:
                    path = next(fh_iter)
                except StopIteration:
                    break
                except Exception as ex:
                    if debug:
                        print(base_directory, "error reading line", str(ex))
                    break

                line_count += 1
                # Not expected, file may be truncated.
                if not path.endswith("\n"):
                    if debug:
                        print(base_directory, "expected line endings on each line")
                    continue
                path = path[:-1]
                # Not expected but harmless, ignore if it does.
                if not path:
                    if debug:
                        print(base_directory, "expected line not to be empty")
                    continue

                path_abs = base_directory + (path if sep == "/" else path.replace("/", "\\"))

                if check_exists:
                    # Harmless, somehow the file was removed.
                    if not os.path.exists(path_abs):
                        continue

                path_abs = os.path.normpath(path_abs)
                # Not expected, ensure under *no* conditions paths outside this directory are removed.
                if not path_abs.startswith(base_directory):
                    if debug:
                        print(base_directory, "stale file points to parent path (unexpected but harmless)", repr(path))
                    continue

                # Ensure the `base_directory` & `path_abs` they are not the same.
                # One could be forgiven for thinking they must never be the same since `path`
                # is known not be an empty string, one would be mistaken!
                # WIN32 which considers `C:\path\` the same as `C:\path\. ` to be the same.
                # Therefor, literal lines containing any combination of trailing full-stop
                # or space characters would be considered files that cannot be removed.
                # While this should never under normal conditions happen,
                # guarantee that stale file removal *never* removes anything it should not,
                # including situations when random bytes are written into this file
                # (except in the case the random bytes happen to match a patch - which can't be avoided).
                #
                # If this ever did happen besides potentially trying to remove `base_directory`,
                # this path could be treated as a file which could not be removed and queued for
                # removal again causing a single space (for example) to be left in the stale file,
                # trying to be removed every startup and failing.
                # Avoid all these issues by checking the path doesn't resolve to being the same path as it's parent.
                is_same = False
                try:
                    is_same = os.path.samefile(base_directory, path_abs)
                except FileNotFoundError:
                    pass
                except Exception as ex:
                    if debug:
                        print(base_directory, "error checking the same path", str(ex))

                if is_same:
                    if debug:
                        print(base_directory, "path results to it's parent", repr(path))
                    continue

                # NOTE: duplicates are not checked, while they aren't expected, duplicates won't cause errors.
                paths.append(path)

        self._is_modified = len(paths) != line_count

    def state_store(self, *, check_exists: bool) -> None:
        import contextlib
        import os
        from os import sep

        base_directory = self._base_directory
        debug = self._debug

        stale_filepath = os.path.join(base_directory, self._stale_filename)

        if not self._paths:
            self._is_modified = False
            try:
                os.remove(stale_filepath)
            except FileNotFoundError:
                pass
            except Exception as ex:
                if debug:
                    print(base_directory, "failed to remove!", str(ex))
                self._is_modified = True

            return

        try:
            # pylint: disable-next=consider-using-with
            fh_context = open(stale_filepath, "w", encoding="utf8", errors="surrogateescape")
        except Exception as ex:
            if debug:
                print(base_directory, "error opening file for write", str(ex))
            self._is_modified = True
            return

        # Assume success, any errors can set to true.
        is_modified = False

        with contextlib.closing(fh_context) as fh:
            for path in self._paths:
                if check_exists:
                    path_abs = base_directory + (path if sep == "/" else path.replace("/", "\\"))
                    # Harmless, somehow the file was removed.
                    if not os.path.exists(path_abs):
                        continue

                try:
                    fh.write(path + "\n")
                except Exception as ex:
                    if debug:
                        print(base_directory, "failed to write path", str(ex))
                    is_modified = True
                    break

        self._is_modified = is_modified

    def state_remove_all(self) -> bool:
        import stat
        import shutil
        import os

        from os import sep

        base_directory = self._base_directory
        debug = self._debug

        paths_next = []

        for path in self._paths:
            path_abs = base_directory + (path if sep == "/" else path.replace("/", "\\"))
            path_abs = os.path.normpath(path_abs)

            # Should be unreachable, extra paranoid check so we *never*
            # recursively remove anything outside of the base directory.
            if not path_abs.startswith(base_directory):
                print("Internal error detected attempting to remove file outside of:", base_directory)
                continue

            try:
                st = os.stat(path_abs)
            except FileNotFoundError:
                # Not a problem if it's already removed.
                continue
            except Exception as ex:
                if debug:
                    print(base_directory, "failed to stat file", path, str(ex))
                continue

            if stat.S_ISDIR(st.st_mode):
                try:
                    shutil.rmtree(path_abs)
                except Exception as ex:
                    # May be necessary with links.
                    try:
                        os.remove(path_abs)
                    except Exception:
                        if debug:
                            print(base_directory, "failed to remove dir", path, str(ex))
            else:
                try:
                    os.remove(path_abs)
                except Exception as ex:
                    if debug:
                        print(base_directory, "failed to remove file", path, str(ex))

            # Failed to remove, add back to the list.
            if os.path.exists(path_abs):
                paths_next.append(path)

        if len(self._paths) == len(paths_next):
            return False

        self._is_modified = True
        self._paths[:] = paths_next
        return True

    def state_load_add_and_store(
            self,
            *,
            # A sequence of absolute paths within `_base_directory`.
            paths: Sequence[str],
    ) -> bool:
        # Convenience function for a common operation.
        # Return true when one or more items from "paths" were added to the "state".

        self.state_load(check_exists=True)
        if not self.is_empty():
            self.state_remove_all()

        result = False
        for path_abs in paths:
            self.filepath_add(path_abs, rename=True)
            result = True

        if self.is_modified():
            self.state_store(check_exists=False)

        return result

    def state_load_remove_and_store(
            self,
            *,
            # A sequence of absolute paths within `_base_directory`.
            paths: Sequence[str],
    ) -> bool:
        # Convenience function for a common operation.
        # Return true when one or more items from "paths" were removed from the "state".

        self.state_load(check_exists=False)
        # Accounts for the common case where nothing has been marked for removal.
        if not self._paths:
            return False

        paths_remove_canonical = {
            self._filepath_relative_and_canonicalize(path_abs) for path_abs in paths
            if self._filepath_relative_test(path_abs)
        }

        paths_next = [path for path in self._paths if path not in paths_remove_canonical]
        if len(self._paths) == len(paths_next):
            return False

        self._paths[:] = paths_next
        self._is_modified = True

        self.state_store(check_exists=False)

        return True

    def _filepath_relative_test(self, path_abs: str) -> bool:
        debug = self._debug
        base_directory = self._base_directory
        if not path_abs.startswith(base_directory):
            if debug:
                print(base_directory, "is not a sub-directory", path_abs)
            return False
        return True

    def _filepath_relative_and_canonicalize(self, path_abs: str) -> str:
        from os import sep

        assert self._filepath_relative_test(path_abs)

        path = path_abs[len(self._base_directory):].lstrip(sep)
        if sep == "\\":
            path = path.replace("\\", "/")
        return path

    def _filepath_rename_to_stale(self, path_abs: str) -> str:
        import os

        base_directory = self._base_directory
        debug = self._debug

        # These need not necessarily match, it could be optional.
        prefix = self._stale_filename

        dirpath = os.path.dirname(path_abs)
        stale_index = self._index_cache.get(dirpath, 1)
        while True:
            path_abs_stale = os.path.join(dirpath, "{:s}{:04x}".format(prefix, stale_index))
            if not os.path.exists(path_abs_stale):
                break
            stale_index += 1

        rename_ok = False
        try:
            os.rename(path_abs, path_abs_stale)
            rename_ok = True
        except Exception as ex:
            if debug:
                print(base_directory, "failed to rename path", str(ex))

        if rename_ok:
            self._index_cache[dirpath] = stale_index + 1
        else:
            # Failed to rename, make the previous name stale as we have no better options.
            path_abs_stale = path_abs
            if debug:
                print("failed to rename:", path_abs)

        return path_abs_stale

    def filepath_add(self, path_abs: str, *, rename: bool) -> bool:
        if not self._filepath_relative_test(path_abs):
            return False

        if rename:
            path_abs = self._filepath_rename_to_stale(path_abs)
        path = self._filepath_relative_and_canonicalize(path_abs)

        self._is_modified = True
        self._paths.append(path)
        return True
