# Copyright (C) 2012 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause
"""EXPERIMENTAL Conda environment data model"""

from __future__ import annotations

from dataclasses import dataclass, field, fields, replace
from functools import reduce
from itertools import chain
from logging import getLogger
from typing import TYPE_CHECKING

from ..base.constants import PLATFORMS, UNKNOWN_CHANNEL
from ..base.context import context, validate_channels
from ..common.constants import NULL
from ..common.iterators import groupby_to_dict as groupby
from ..core.prefix_data import PrefixData
from ..exceptions import CondaValueError
from ..history import History
from ..misc import get_package_records_from_explicit
from .match_spec import MatchSpec

if TYPE_CHECKING:
    from argparse import Namespace
    from collections.abc import Iterable
    from typing import Final, TypeVar

    from ..base.constants import (
        ChannelPriority,
        DepsModifier,
        SatSolverChoice,
        UpdateModifier,
    )
    from ..common.path import PathType
    from .records import PackageRecord

    T = TypeVar("T")

log = getLogger(__name__)


EXTERNAL_PACKAGES_PYPI_KEY: Final = "pip"


@dataclass
class EnvironmentConfig:
    """
    **Experimental** While experimental, expect both major and minor changes across minor releases.

    Data model for a conda environment config.
    """

    aggressive_update_packages: tuple[str, ...] = field(default_factory=tuple)

    channel_priority: ChannelPriority | None = None

    channels: tuple[str, ...] = field(default_factory=tuple)

    channel_settings: tuple[dict[str, str], ...] = field(default_factory=tuple)

    deps_modifier: DepsModifier | None = None

    disallowed_packages: tuple[str, ...] = field(default_factory=tuple)

    pinned_packages: tuple[str, ...] = field(default_factory=tuple)

    repodata_fns: tuple[str, ...] = field(default_factory=tuple)

    sat_solver: SatSolverChoice | None = None

    solver: str | None = None

    track_features: tuple[str, ...] = field(default_factory=tuple)

    update_modifier: UpdateModifier | None = None

    use_only_tar_bz2: bool | None = None

    def _append_without_duplicates(
        self, first: Iterable[T], second: Iterable[T]
    ) -> tuple[T, ...]:
        return tuple(dict.fromkeys(item for item in chain(first, second)))

    def _merge_channel_settings(
        self, first: tuple[dict[str, str], ...], second: tuple[dict[str, str], ...]
    ) -> tuple[dict[str, str], ...]:
        """Merge channel settings.

        An individual channel setting is a dict that may have the key "channels". Settings
        with matching "channels" should be merged together.
        """

        grouped_channel_settings = groupby(
            lambda x: x.get("channel"), chain(first, second)
        )

        return tuple(
            {k: v for config in configs for k, v in config.items()}
            for channel, configs in grouped_channel_settings.items()
        )

    def _merge(self, other: EnvironmentConfig) -> EnvironmentConfig:
        """
        **Experimental** While experimental, expect both major and minor changes across minor releases.

        Merges an EnvironmentConfig into this one. Merging rules are:
        * Primitive types get clobbered if subsequent configs have a value, otherwise keep the last set value
        * Lists get appended to and deduplicated
        * Dicts get updated
        * Special cases:
          * channel settings is a list of dicts, it merges inner dicts, keyed on "channel"
          * channels: last wins for priority (prepended so later config's channels take precedence)
        """
        # Return early if there is nothing to merge
        if other is None:
            return self

        # Ensure that we are merging another EnvironmentConfig
        if not isinstance(other, self.__class__):
            raise CondaValueError(
                "Cannot merge EnvironmentConfig with non-EnvironmentConfig"
            )

        self.aggressive_update_packages = self._append_without_duplicates(
            self.aggressive_update_packages, other.aggressive_update_packages
        )

        if other.channel_priority is not None:
            self.channel_priority = other.channel_priority

        self.channels = self._append_without_duplicates(other.channels, self.channels)

        self.channel_settings = self._merge_channel_settings(
            self.channel_settings, other.channel_settings
        )

        if other.deps_modifier is not None:
            self.deps_modifier = other.deps_modifier

        self.disallowed_packages = self._append_without_duplicates(
            self.disallowed_packages, other.disallowed_packages
        )

        self.pinned_packages = self._append_without_duplicates(
            self.pinned_packages, other.pinned_packages
        )

        self.repodata_fns = self._append_without_duplicates(
            self.repodata_fns, other.repodata_fns
        )

        if other.sat_solver is not None:
            self.sat_solver = other.sat_solver

        if other.solver is not None:
            self.solver = other.solver

        self.track_features = self._append_without_duplicates(
            self.track_features, other.track_features
        )

        if other.update_modifier is not None:
            self.update_modifier = other.update_modifier

        if other.use_only_tar_bz2 is not None:
            self.use_only_tar_bz2 = other.use_only_tar_bz2

        return self

    @classmethod
    def from_context(cls) -> EnvironmentConfig:
        """
        **Experimental** While experimental, expect both major and minor changes across minor releases.

        Create an EnvironmentConfig from the current context
        """
        field_names = {field.name for field in fields(cls)}

        environment_settings = {
            key: value
            for key, value in context.environment_settings.items()
            if key in field_names
        }
        return cls(**environment_settings)

    @classmethod
    def from_cli_channels(cls, args: Namespace) -> EnvironmentConfig:
        """
        Build a sparse EnvironmentConfig from CLI args for channels only.

        Used to override channels from the CLI when files are provided.
        """
        # NOTE: In the future, we may want to add other relevant CLI
        # options to the EnvironmentConfig.
        channel = getattr(args, "channel", None)
        # NULL is the default value for use_local. bool(NULL) is False.
        use_local = getattr(args, "use_local", NULL)
        if not channel and not use_local:
            return cls()

        local = ("local",) if use_local else ()
        arg_channels = tuple(channel) if channel else ()
        channels = validate_channels(local + arg_channels) if arg_channels else local
        return cls(channels=channels)

    @classmethod
    def merge(cls, *configs: EnvironmentConfig) -> EnvironmentConfig:
        """
        **Experimental** While experimental, expect both major and minor changes across minor releases.

        Merges a list of EnvironmentConfigs into a single one. Merging rules are:
        * Primitive types get clobbered if subsequent configs have a value, otherwise keep the last set value
        * Lists get appended to and deduplicated
        * Dicts get updated
        """

        # Don't try to merge if there is nothing to merge
        if not configs:
            return

        # If there is only one config, there is nothing to merge, return the lone config
        if len(configs) == 1:
            return configs[0]

        # Use reduce to merge all configs into the first one
        return reduce(
            lambda result, config: result._merge(config), configs[1:], configs[0]
        )


@dataclass(kw_only=True)
class Environment:
    """
    **Experimental** While experimental, expect both major and minor changes across minor releases.

    Data model for a conda environment.
    """

    platform: str
    """The platform this environment may be installed on (required)."""

    config: EnvironmentConfig = field(default_factory=EnvironmentConfig)
    """Environment level configuration, eg. channels, solver options, etc.

    TODO: may need to think more about the type of this field and how
    conda should be merging configs between environments.
    """

    external_packages: dict[str, list[str]] = field(default_factory=dict)
    """Map of other package types that conda can install. For example pypi packages."""

    explicit_packages: list[PackageRecord] = field(default_factory=list)
    """The complete list of specs for the environment.

    E.g. after a solve, or from an explicit environment spec.
    """

    name: str | None = None
    """Environment name."""

    prefix: str | None = None
    """Prefix the environment is installed into."""

    requested_packages: list[MatchSpec] = field(default_factory=list)
    """User requested specs for this environment."""

    variables: dict[str, str] = field(default_factory=dict)
    """Environment variables to be applied to the environment."""

    # Virtual packages for the environment. Either the default ones provided by
    # the virtual_packages plugins or the overrides captured by CONDA_OVERRIDE_*.
    virtual_packages: list[PackageRecord] = field(default_factory=list)

    def __post_init__(self):
        # an environment must have a platform
        if not self.platform:
            raise CondaValueError("'Environment' needs a 'platform'.")

        # ensure the platform is valid
        if self.platform not in PLATFORMS:
            raise CondaValueError(
                f"Invalid platform '{self.platform}'. Valid platforms are {PLATFORMS}."
            )

        # ensure there are no duplicate packages in explicit_packages
        if len(self.explicit_packages) > 1 and len(
            set(pkg.name for pkg in self.explicit_packages)
        ) != len(self.explicit_packages):
            raise CondaValueError("Duplicate packages found in 'explicit_packages'.")

        # ensure requested_packages matches one (and only one) explicit package
        if len(self.requested_packages) > 0 and len(self.explicit_packages) > 0:
            explicit_package_names = set(pkg.name for pkg in self.explicit_packages)
            for requested_package in self.requested_packages:
                if requested_package.name not in explicit_package_names:
                    raise CondaValueError(
                        f"Requested package '{requested_package}' is not found in 'explicit_packages'."
                    )

    @classmethod
    def merge(cls, *environments):
        """
        **Experimental** While experimental, expect both major and minor changes across minor releases.

        Merges multiple environments into a single environment following the rules:
        * name, prefix: last wins (later env overrides earlier).
        * platform: must match across all envs.
        * requested_packages, explicit_packages, virtual_packages: union, deduplicated.
        * variables: merged dict, last value wins per key.
        * external_packages: concatenation per key (union of lists, deduplicated).
        * config: EnvironmentConfig.merge (last wins for primitives, append for lists).
        """
        name = None
        prefix = None
        platform = None
        names = [env.name for env in environments if env.name]
        prefixes = [env.prefix for env in environments if env.prefix]

        if names:
            name = names[-1]
            if len(names) > 1:
                log.debug("Several names passed %s. Picking last one %s", names, name)

        if prefixes:
            prefix = prefixes[-1]
            if len(prefixes) > 1:
                log.debug(
                    "Several prefixes passed %s. Picking last one %s", prefixes, prefix
                )

        platforms = [env.platform for env in environments if env.platform]
        if len(set(platforms)) != 1:
            raise CondaValueError(
                "Conda can not merge environments of different platforms. "
                f"Received environments with platforms: {platforms}"
            )
        platform = platforms[0]

        requested_packages = list(
            dict.fromkeys(
                requirement
                for env in environments
                for requirement in env.requested_packages
            )
        )

        explicit_packages = list(
            dict.fromkeys(
                requirement
                for env in environments
                for requirement in env.explicit_packages
            )
        )

        virtual_packages = list(
            dict.fromkeys(
                virtual_package
                for env in environments
                for virtual_package in env.virtual_packages
            )
        )

        variables = {
            k: v for env in environments for (k, v) in (env.variables or {}).items()
        }

        external_packages = {}
        for env in environments:
            for k, v in (env.external_packages or {}).items():
                if not isinstance(v, list):
                    continue
                if k in external_packages:
                    new_pkgs = [pkg for pkg in v if pkg not in external_packages[k]]
                    external_packages[k].extend(new_pkgs)
                else:
                    external_packages[k] = list(v)

        config = EnvironmentConfig.merge(
            *[env.config for env in environments if env.config is not None]
        )

        return cls(
            config=config,
            external_packages=external_packages,
            explicit_packages=explicit_packages,
            name=name,
            platform=platform,
            prefix=prefix,
            requested_packages=requested_packages,
            variables=variables,
            virtual_packages=virtual_packages,
        )

    @classmethod
    def from_prefix(
        cls,
        prefix: str,
        name: str,
        platform: str,
        *,
        from_history: bool = False,
        no_builds: bool = False,
        ignore_channels: bool = False,
        channels: list[str] | None = None,
    ) -> Environment:
        """
        Create an Environment model from an existing conda prefix.

        This method analyzes an installed conda environment and creates
        an Environment model that can be used for exporting or other operations.

        :param prefix: Path to the conda environment prefix
        :param name: Name for the environment
        :param platform: Target platform (e.g., 'linux-64', 'osx-64')
        :param from_history: Use explicit specs from history instead of installed packages
        :param no_builds: Exclude build strings from package specs
        :param ignore_channels: Don't include channel information in package specs
        :return: Environment model representing the prefix
        """
        prefix_data = PrefixData(prefix, interoperability=True)
        variables = prefix_data.get_environment_env_vars()

        # Build requested packages and external packages
        requested_packages = []
        external_packages = {}

        python_precs = prefix_data.get_python_packages()

        # Handle --from-history case
        if from_history:
            requested_packages = cls.from_history(prefix)
            conda_precs = []  # No conda packages to process for channel extraction
        else:
            # Use PrefixData's package extraction methods
            conda_precs = prefix_data.get_conda_packages()

            # Create MatchSpecs for conda packages
            for conda_prec in conda_precs:
                spec_str = conda_prec.spec_no_build if no_builds else conda_prec.spec

                if (
                    not ignore_channels
                    and conda_prec.channel
                    and conda_prec.channel.name
                ):
                    spec_str = f"{conda_prec.channel.name}::{spec_str}"

                requested_packages.append(MatchSpec(spec_str))

            # Add pip dependencies to external_packages if any exist
            if python_precs:
                # Create pip dependencies list matching current conda format
                python_deps = [
                    f"{python_prec.name}=={python_prec.version}"
                    for python_prec in python_precs
                ]
                external_packages[EXTERNAL_PACKAGES_PYPI_KEY] = python_deps

        # Always populate explicit_packages from prefix data (for explicit export format).
        # But don't include packages installed by pip (or other external package formats).
        python_precs_names = [pkg.name for pkg in python_precs]
        explicit_packages = list(
            pkg
            for pkg in prefix_data.iter_records()
            if pkg.name not in python_precs_names
        )

        # Build channels tuple
        environment_channels = tuple(channels or ())

        # Inject channels from installed conda packages (unless ignoring channels)
        # This applies regardless of override_channels setting
        if not ignore_channels:
            environment_channels = (
                *(
                    canonical_name
                    # Reuse conda_precs instead of calling get_conda_packages() again
                    for conda_package in conda_precs
                    if (canonical_name := conda_package.channel.canonical_name)
                    != UNKNOWN_CHANNEL
                ),
                *environment_channels,
            )

        # Channels tuple is a unique ordered sequence
        environment_channels = tuple(dict.fromkeys(environment_channels))

        # Create environment config with comprehensive context settings
        config = EnvironmentConfig.from_context()

        # Override/set channels with those extracted from installed packages if any were found
        config = replace(config, channels=environment_channels)

        return cls(
            prefix=prefix,
            platform=platform,
            name=name,
            config=config,
            variables=variables,
            external_packages=external_packages,
            requested_packages=requested_packages,
            explicit_packages=explicit_packages,
        )

    @classmethod
    def from_cli(
        cls,
        args: Namespace,
        add_default_packages: bool = False,
    ) -> Environment:
        """
        Create an Environment model from command-line arguments.

        This method will parse command-line arguments and create an
        Environment object. This includes: reading files provided as
        cli arguments, and pulling EnvironmentConfig from the context.

        :param args: argparse Namespace containing command-line arguments
        :return: An Environment object representing the cli
        """
        env, _ = cls.from_cli_with_file_envs(args, add_default_packages)
        return env

    @classmethod
    def from_cli_with_file_envs(
        cls,
        args: Namespace,
        add_default_packages: bool = False,
    ) -> tuple[Environment, dict[str, Environment]]:
        """
        Create an Environment model from command-line arguments, with a map
        of file path to Environment for each environment file specified.

        :param args: argparse Namespace containing command-line arguments
        :return: Tuple of (merged Environment, dict mapping file path to Environment)
        """
        specs = [package.strip("\"'") for package in args.packages]
        requested_packages = []
        fetch_explicit_packages = []

        envs_from_file = []
        fpath_envs_map = {}
        for fpath in args.file:
            spec_hook = context.plugin_manager.get_environment_specifier(
                source=fpath,
                name=context.environment_specifier,
            )
            spec = spec_hook.environment_spec(fpath)
            envs_from_file.append(spec.env)
            fpath_envs_map[fpath] = spec.env

        # Add default packages if required. If the default package is already
        # present in the list of specs, don't add it (this will override any
        # version constraint from the default package).
        if add_default_packages:
            names = {MatchSpec(spec).name for spec in specs}
            names |= {
                pkg.name
                for env in envs_from_file
                for pkg in (*env.explicit_packages, *env.requested_packages)
            }
            for default_package in context.create_default_packages:
                if MatchSpec(default_package).name not in names:
                    specs.append(default_package)

        for spec in specs:
            if (match_spec := MatchSpec(spec)).get("url"):
                fetch_explicit_packages.append(spec)
            else:
                requested_packages.append(match_spec)

        # Don't allow mixing of explicit packages (url or path) with regular specs.
        has_explicit = any(env.explicit_packages for env in envs_from_file) or bool(
            fetch_explicit_packages
        )
        has_specs = any(env.requested_packages for env in envs_from_file) or bool(
            requested_packages
        )
        if has_explicit and has_specs:
            raise CondaValueError(
                "Cannot combine package names with explicit package lists. "
                "Package names (python, numpy) are resolved by the solver; "
                "explicit files and URLs are installed directly. "
                "Use only package names, or only explicit files/URLs."
            )

        # transform explicit packages into package records
        explicit_packages = []
        if fetch_explicit_packages:
            explicit_packages = get_package_records_from_explicit(
                fetch_explicit_packages
            )

        base_env = Environment(
            platform=context.subdir, config=EnvironmentConfig.from_context()
        )

        cli_env = Environment(
            name=args.name,
            prefix=context.target_prefix,
            platform=context.subdir,
            requested_packages=requested_packages,
            explicit_packages=explicit_packages,
            config=EnvironmentConfig.from_cli_channels(args),
        )

        if envs_from_file:
            file_env = cls.merge(*envs_from_file)
            merged = cls.merge(base_env, file_env, cli_env)

        else:
            merged = cls.merge(base_env, cli_env)

        # Respect override_channels flag and only use channels from the CLI
        # TODO: This is a way to override the channels from the CLI.
        # We should probably add a way to handle this in a central place.
        if getattr(args, "override_channels", False):
            merged = replace(
                merged, config=replace(merged.config, channels=cli_env.config.channels)
            )
        return merged, fpath_envs_map

    @staticmethod
    def from_history(prefix: PathType) -> list[MatchSpec]:
        history = History(prefix)
        spec_map = history.get_requested_specs_map()
        # Get MatchSpec objects from history; they'll be serialized to bracket format later
        return list(spec_map.values())

    def extrapolate(self, platform: str) -> Environment:
        """
        Given the current environment, extrapolate the environment for the given platform.
        """
        if platform == self.platform:
            return self

        from ..cli.install import Repodatas

        solver_backend = context.plugin_manager.get_cached_solver_backend()
        requested_packages = self.from_history(self.prefix)

        for repodata_manager in Repodatas(self.config.repodata_fns, {}):
            with repodata_manager as repodata_fn:
                solver = solver_backend(
                    prefix="/env/does/not/exist",
                    channels=self.config.channels,
                    subdirs=(platform, "noarch"),
                    specs_to_add=requested_packages,
                    repodata_fn=repodata_fn,
                    command="create",
                )
                explicit_packages = solver.solve_final_state()
        return Environment(
            prefix=self.prefix,
            name=self.name,
            platform=platform,
            config=self.config,
            requested_packages=requested_packages,
            explicit_packages=explicit_packages,
            external_packages=self.external_packages,
        )
