# Copyright 2025 The Kandinsky Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Tuple, Union

import torch
import torch.nn as nn
import torch.nn.functional as F

from ...configuration_utils import ConfigMixin, register_to_config
from ...utils.accelerate_utils import apply_forward_hook
from ..activations import get_activation
from ..modeling_outputs import AutoencoderKLOutput
from ..modeling_utils import ModelMixin
from .vae import AutoencoderMixin, DecoderOutput, DiagonalGaussianDistribution


class KVAEResnetBlock2D(nn.Module):
    r"""
    A Resnet block with optional guidance.

    Parameters:
        in_channels (`int`): The number of channels in the input.
        out_channels (`int`, *optional*, default to `None`):
            The number of output channels for the first conv2d layer. If None, same as `in_channels`.
        conv_shortcut (`bool`, *optional*, default to `False`):
            If `True` and `in_channels` not equal to `out_channels`, add a 3x3 nn.conv2d layer for skip-connection.
        temb_channels (`int`, *optional*, default to `512`): The number of channels in timestep embedding.
        zq_ch (`int`, *optional*, default to `None`): Guidance channels for normalization.
        add_conv (`bool`, *optional*, default to `False`):
            If `True` add conv2d layer for normalization.
        normalization (`nn.Module`, *optional*, default to `None`): The normalization layer.
        act_fn (`str`, *optional*, default to `"swish"`): The activation function to use.
    """

    def __init__(
        self,
        *,
        in_channels: int,
        out_channels: Optional[int] = None,
        conv_shortcut: bool = False,
        temb_channels: int = 512,
        zq_ch: Optional[int] = None,
        add_conv: bool = False,
        act_fn: str = "swish",
    ):
        super().__init__()
        self.in_channels = in_channels
        out_channels = in_channels if out_channels is None else out_channels
        self.out_channels = out_channels
        self.use_conv_shortcut = conv_shortcut
        self.nonlinearity = get_activation(act_fn)

        if zq_ch is None:
            self.norm1 = nn.GroupNorm(num_channels=in_channels, num_groups=32, eps=1e-6, affine=True)
        else:
            self.norm1 = KVAEDecoderSpatialNorm2D(in_channels, zq_channels=zq_ch, add_conv=add_conv)

        self.conv1 = nn.Conv2d(
            in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=(1, 1), padding_mode="replicate"
        )
        if temb_channels > 0:
            self.temb_proj = torch.nn.Linear(temb_channels, out_channels)
        if zq_ch is None:
            self.norm2 = nn.GroupNorm(num_channels=out_channels, num_groups=32, eps=1e-6, affine=True)
        else:
            self.norm2 = KVAEDecoderSpatialNorm2D(out_channels, zq_channels=zq_ch, add_conv=add_conv)
        self.conv2 = nn.Conv2d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=3,
            padding=(1, 1),
            padding_mode="replicate",
        )
        if self.in_channels != self.out_channels:
            if self.use_conv_shortcut:
                self.conv_shortcut = nn.Conv2d(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    kernel_size=3,
                    padding=(1, 1),
                    padding_mode="replicate",
                )
            else:
                self.nin_shortcut = nn.Conv2d(
                    in_channels,
                    out_channels,
                    kernel_size=1,
                    stride=1,
                    padding=0,
                )

    def forward(self, x: torch.Tensor, temb: torch.Tensor, zq: torch.Tensor = None) -> torch.Tensor:
        h = x

        if zq is None:
            h = self.norm1(h)
        else:
            h = self.norm1(h, zq)

        h = self.nonlinearity(h)
        h = self.conv1(h)

        if temb is not None:
            h = h + self.temb_proj(self.nonlinearity(temb))[:, :, None, None, None]

        if zq is None:
            h = self.norm2(h)
        else:
            h = self.norm2(h, zq)

        h = self.nonlinearity(h)

        h = self.conv2(h)

        if self.in_channels != self.out_channels:
            if self.use_conv_shortcut:
                x = self.conv_shortcut(x)
            else:
                x = self.nin_shortcut(x)

        return x + h


class KVAEPXSDownsample(nn.Module):
    def __init__(self, in_channels: int, factor: int = 2):
        r"""
        A Downsampling module.

        Args:
            in_channels (`int`): The number of channels in the input.
            factor (`int`, *optional*, default to `2`): The downsampling factor.
        """
        super().__init__()
        self.factor = factor
        self.unshuffle = nn.PixelUnshuffle(self.factor)
        self.spatial_conv = nn.Conv2d(
            in_channels, in_channels, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), padding_mode="reflect"
        )
        self.linear = nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: (bchw)
        pxs_interm = self.unshuffle(x)
        b, c, h, w = pxs_interm.shape
        pxs_interm_view = pxs_interm.view(b, c // self.factor**2, self.factor**2, h, w)
        pxs_out = torch.mean(pxs_interm_view, dim=2)

        conv_out = self.spatial_conv(x)

        # adding it all together
        out = conv_out + pxs_out
        return self.linear(out)


class KVAEPXSUpsample(nn.Module):
    def __init__(self, in_channels: int, factor: int = 2):
        r"""
        An Upsampling module.

        Args:
            in_channels (`int`): The number of channels in the input.
            factor (`int`, *optional*, default to `2`): The upsampling factor.
        """
        super().__init__()
        self.factor = factor
        self.shuffle = nn.PixelShuffle(self.factor)
        self.spatial_conv = nn.Conv2d(
            in_channels, in_channels, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), padding_mode="reflect"
        )

        self.linear = nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        repeated = x.repeat_interleave(self.factor**2, dim=1)
        pxs_interm = self.shuffle(repeated)

        image_like_ups = F.interpolate(x, scale_factor=2, mode="nearest")
        conv_out = self.spatial_conv(image_like_ups)

        # adding it all together
        out = conv_out + pxs_interm
        return self.linear(out)


class KVAEDecoderSpatialNorm2D(nn.Module):
    r"""
    A 2D normalization module for decoder.

    Args:
        in_channels (`int`): The number of channels in the input.
        zq_channels (`int`): The number of channels in the guidance.
        add_conv (`bool`, *optional*, default to `false`):
            If `True` add conv2d 3x3 layer for guidance in the beginning.
    """

    def __init__(
        self,
        in_channels: int,
        zq_channels: int,
        add_conv: bool = False,
    ):
        super().__init__()
        self.norm_layer = nn.GroupNorm(num_channels=in_channels, num_groups=32, eps=1e-6, affine=True)

        self.add_conv = add_conv
        if add_conv:
            self.conv = nn.Conv2d(
                in_channels=zq_channels,
                out_channels=zq_channels,
                kernel_size=3,
                padding=(1, 1),
                padding_mode="replicate",
            )

        self.conv_y = nn.Conv2d(
            in_channels=zq_channels,
            out_channels=in_channels,
            kernel_size=1,
        )
        self.conv_b = nn.Conv2d(
            in_channels=zq_channels,
            out_channels=in_channels,
            kernel_size=1,
        )

    def forward(self, f: torch.Tensor, zq: torch.Tensor) -> torch.Tensor:
        f_first = f
        f_first_size = f_first.shape[2:]
        zq = F.interpolate(zq, size=f_first_size, mode="nearest")

        if self.add_conv:
            zq = self.conv(zq)

        norm_f = self.norm_layer(f)
        new_f = norm_f * self.conv_y(zq) + self.conv_b(zq)
        return new_f


class KVAEEncoder2D(nn.Module):
    r"""
    A 2D encoder module.

    Args:
        ch (`int`): The base number of channels in multiresolution blocks.
        ch_mult (`Tuple[int, ...]`, *optional*, default to `(1, 2, 4, 8)`):
            The channel multipliers in multiresolution blocks.
        num_res_blocks (`int`): The number of Resnet blocks.
        in_channels (`int`): The number of channels in the input.
        z_channels (`int`): The number of output channels.
        double_z (`bool`, *optional*, defaults to `True`):
            Whether to double the number of output channels for the last block.
        act_fn (`str`, *optional*, default to `"swish"`): The activation function to use.
    """

    def __init__(
        self,
        *,
        ch: int,
        ch_mult: Tuple[int, ...] = (1, 2, 4, 8),
        num_res_blocks: int,
        in_channels: int,
        z_channels: int,
        double_z: bool = True,
        act_fn: str = "swish",
    ):
        super().__init__()
        self.ch = ch
        self.temb_ch = 0
        self.num_resolutions = len(ch_mult)
        if isinstance(num_res_blocks, int):
            self.num_res_blocks = [num_res_blocks] * self.num_resolutions
        else:
            self.num_res_blocks = num_res_blocks
        self.nonlinearity = get_activation(act_fn)

        self.in_channels = in_channels

        self.conv_in = nn.Conv2d(
            in_channels=in_channels,
            out_channels=self.ch,
            kernel_size=3,
            padding=(1, 1),
        )

        in_ch_mult = (1,) + tuple(ch_mult)
        self.down = nn.ModuleList()
        for i_level in range(self.num_resolutions):
            block = nn.ModuleList()
            attn = nn.ModuleList()
            block_in = ch * in_ch_mult[i_level]
            block_out = ch * ch_mult[i_level]
            for i_block in range(self.num_res_blocks[i_level]):
                block.append(
                    KVAEResnetBlock2D(
                        in_channels=block_in,
                        out_channels=block_out,
                        temb_channels=self.temb_ch,
                    )
                )
                block_in = block_out
            down = nn.Module()
            down.block = block
            down.attn = attn
            if i_level < self.num_resolutions - 1:
                down.downsample = KVAEPXSDownsample(in_channels=block_in)  # mb: bad out channels
            self.down.append(down)

        # middle
        self.mid = nn.Module()
        self.mid.block_1 = KVAEResnetBlock2D(
            in_channels=block_in,
            out_channels=block_in,
            temb_channels=self.temb_ch,
        )

        self.mid.block_2 = KVAEResnetBlock2D(
            in_channels=block_in,
            out_channels=block_in,
            temb_channels=self.temb_ch,
        )

        # end
        self.norm_out = nn.GroupNorm(num_channels=block_in, num_groups=32, eps=1e-6, affine=True)

        self.conv_out = nn.Conv2d(
            in_channels=block_in,
            out_channels=2 * z_channels if double_z else z_channels,
            kernel_size=3,
            padding=(1, 1),
        )

        self.gradient_checkpointing = False

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # timestep embedding
        temb = None

        # downsampling
        h = self.conv_in(x)
        for i_level in range(self.num_resolutions):
            for i_block in range(self.num_res_blocks[i_level]):
                if torch.is_grad_enabled() and self.gradient_checkpointing:
                    h = self._gradient_checkpointing_func(self.down[i_level].block[i_block], h, temb)
                else:
                    h = self.down[i_level].block[i_block](h, temb)
                if len(self.down[i_level].attn) > 0:
                    h = self.down[i_level].attn[i_block](h)
            if i_level != self.num_resolutions - 1:
                h = self.down[i_level].downsample(h)

        # middle
        if torch.is_grad_enabled() and self.gradient_checkpointing:
            h = self._gradient_checkpointing_func(self.mid.block_1, h, temb)
            h = self._gradient_checkpointing_func(self.mid.block_2, h, temb)
        else:
            h = self.mid.block_1(h, temb)
            h = self.mid.block_2(h, temb)

        # end
        h = self.norm_out(h)
        h = self.nonlinearity(h)
        h = self.conv_out(h)

        return h


class KVAEDecoder2D(nn.Module):
    r"""
    A 2D decoder module.

    Args:
        ch (`int`): The base number of channels in multiresolution blocks.
        out_ch (`int`): The number of output channels.
        ch_mult (`Tuple[int, ...]`, *optional*, default to `(1, 2, 4, 8)`):
            The channel multipliers in multiresolution blocks.
        num_res_blocks (`int`): The number of Resnet blocks.
        in_channels (`int`): The number of channels in the input.
        z_channels (`int`): The number of input channels.
        give_pre_end (`bool`, *optional*, default to `false`):
            If `True` exit the forward pass early and return the penultimate feature map.
        zq_ch (`bool`, *optional*, default to `None`): The number of channels in the guidance.
        add_conv (`bool`, *optional*, default to `false`): If `True` add conv2d layer for Resnet normalization layer.
        act_fn (`str`, *optional*, default to `"swish"`): The activation function to use.
    """

    def __init__(
        self,
        *,
        ch: int,
        out_ch: int,
        ch_mult: Tuple[int, ...] = (1, 2, 4, 8),
        num_res_blocks: int,
        in_channels: int,
        z_channels: int,
        give_pre_end: bool = False,
        zq_ch: Optional[int] = None,
        add_conv: bool = False,
        act_fn: str = "swish",
    ):
        super().__init__()
        self.ch = ch
        self.temb_ch = 0
        self.num_resolutions = len(ch_mult)
        self.num_res_blocks = num_res_blocks
        self.in_channels = in_channels
        self.give_pre_end = give_pre_end
        self.nonlinearity = get_activation(act_fn)

        if zq_ch is None:
            zq_ch = z_channels

        # compute in_ch_mult, block_in and curr_res at lowest res
        block_in = ch * ch_mult[self.num_resolutions - 1]

        self.conv_in = nn.Conv2d(
            in_channels=z_channels, out_channels=block_in, kernel_size=3, padding=(1, 1), padding_mode="replicate"
        )

        # middle
        self.mid = nn.Module()
        self.mid.block_1 = KVAEResnetBlock2D(
            in_channels=block_in,
            out_channels=block_in,
            temb_channels=self.temb_ch,
            zq_ch=zq_ch,
            add_conv=add_conv,
        )

        self.mid.block_2 = KVAEResnetBlock2D(
            in_channels=block_in,
            out_channels=block_in,
            temb_channels=self.temb_ch,
            zq_ch=zq_ch,
            add_conv=add_conv,
        )

        # upsampling
        self.up = nn.ModuleList()
        for i_level in reversed(range(self.num_resolutions)):
            block = nn.ModuleList()
            attn = nn.ModuleList()
            block_out = ch * ch_mult[i_level]
            for i_block in range(self.num_res_blocks + 1):
                block.append(
                    KVAEResnetBlock2D(
                        in_channels=block_in,
                        out_channels=block_out,
                        temb_channels=self.temb_ch,
                        zq_ch=zq_ch,
                        add_conv=add_conv,
                    )
                )
                block_in = block_out
            up = nn.Module()
            up.block = block
            up.attn = attn
            if i_level != 0:
                up.upsample = KVAEPXSUpsample(in_channels=block_in)
            self.up.insert(0, up)

        self.norm_out = KVAEDecoderSpatialNorm2D(block_in, zq_ch, add_conv=add_conv)  # , gather=gather_norm)

        self.conv_out = nn.Conv2d(
            in_channels=block_in, out_channels=out_ch, kernel_size=3, padding=(1, 1), padding_mode="replicate"
        )

        self.gradient_checkpointing = False

    def forward(self, z: torch.Tensor) -> torch.Tensor:
        self.last_z_shape = z.shape

        # timestep embedding
        temb = None

        # z to block_in
        zq = z
        h = self.conv_in(z)

        # middle
        if torch.is_grad_enabled() and self.gradient_checkpointing:
            h = self._gradient_checkpointing_func(self.mid.block_1, h, temb, zq)
            h = self._gradient_checkpointing_func(self.mid.block_2, h, temb, zq)
        else:
            h = self.mid.block_1(h, temb, zq)
            h = self.mid.block_2(h, temb, zq)

        # upsampling
        for i_level in reversed(range(self.num_resolutions)):
            for i_block in range(self.num_res_blocks + 1):
                if torch.is_grad_enabled() and self.gradient_checkpointing:
                    h = self._gradient_checkpointing_func(self.up[i_level].block[i_block], h, temb, zq)
                else:
                    h = self.up[i_level].block[i_block](h, temb, zq)
                if len(self.up[i_level].attn) > 0:
                    h = self.up[i_level].attn[i_block](h, zq)
            if i_level != 0:
                h = self.up[i_level].upsample(h)

        # end
        if self.give_pre_end:
            return h

        h = self.norm_out(h, zq)
        h = self.nonlinearity(h)
        h = self.conv_out(h)

        return h


class AutoencoderKLKVAE(ModelMixin, AutoencoderMixin, ConfigMixin):
    r"""
    A VAE model with KL loss for encoding images into latents and decoding latent representations into images.

    This model inherits from [`ModelMixin`]. Check the superclass documentation for its generic methods implemented for
    all models (such as downloading or saving).

    Parameters:
        in_channels (int, *optional*, defaults to 3): Number of channels in the input image.
        channels (int,  *optional*, defaults to 128): The base number of channels in multiresolution blocks.
        num_enc_blocks (int, *optional*, defaults to 2):
            The number of Resnet blocks in encoder multiresolution layers.
        num_dec_blocks (int, *optional*, defaults to 2):
            The number of Resnet blocks in decoder multiresolution layers.
        z_channels (int, *optional*, defaults to 16): Number of channels in the latent space.
        double_z (`bool`, *optional*, defaults to `True`):
            Whether to double the number of output channels of encoder.
        ch_mult (`Tuple[int, ...]`, *optional*, default to `(1, 2, 4, 8)`):
            The channel multipliers in multiresolution blocks.
        sample_size (`int`, *optional*, defaults to `1024`): Sample input size.
    """

    _supports_gradient_checkpointing = True

    @register_to_config
    def __init__(
        self,
        in_channels: int = 3,
        channels: int = 128,
        num_enc_blocks: int = 2,
        num_dec_blocks: int = 2,
        z_channels: int = 16,
        double_z: bool = True,
        ch_mult: Tuple[int, ...] = (1, 2, 4, 8),
        sample_size: int = 1024,
    ):
        super().__init__()

        # pass init params to Encoder
        self.encoder = KVAEEncoder2D(
            in_channels=in_channels,
            ch=channels,
            ch_mult=ch_mult,
            num_res_blocks=num_enc_blocks,
            z_channels=z_channels,
            double_z=double_z,
        )

        # pass init params to Decoder
        self.decoder = KVAEDecoder2D(
            out_ch=in_channels,
            ch=channels,
            ch_mult=ch_mult,
            num_res_blocks=num_dec_blocks,
            in_channels=None,
            z_channels=z_channels,
        )

        self.use_slicing = False
        self.use_tiling = False

        # only relevant if vae tiling is enabled
        self.tile_sample_min_size = self.config.sample_size
        sample_size = (
            self.config.sample_size[0]
            if isinstance(self.config.sample_size, (list, tuple))
            else self.config.sample_size
        )
        self.tile_latent_min_size = int(sample_size / (2 ** (len(self.config.ch_mult) - 1)))
        self.tile_overlap_factor = 0.25

    def _encode(self, x: torch.Tensor) -> torch.Tensor:
        batch_size, num_channels, height, width = x.shape

        if self.use_tiling and (width > self.tile_sample_min_size or height > self.tile_sample_min_size):
            return self._tiled_encode(x)

        enc = self.encoder(x)

        return enc

    @apply_forward_hook
    def encode(
        self, x: torch.Tensor, return_dict: bool = True
    ) -> Union[AutoencoderKLOutput, Tuple[DiagonalGaussianDistribution]]:
        """
        Encode a batch of images into latents.

        Args:
            x (`torch.Tensor`): Input batch of images.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether to return a [`~models.autoencoder_kl.AutoencoderKLOutput`] instead of a plain tuple.

        Returns:
                The latent representations of the encoded images. If `return_dict` is True, a
                [`~models.autoencoder_kl.AutoencoderKLOutput`] is returned, otherwise a plain `tuple` is returned.
        """
        if self.use_slicing and x.shape[0] > 1:
            encoded_slices = [self._encode(x_slice) for x_slice in x.split(1)]
            h = torch.cat(encoded_slices)
        else:
            h = self._encode(x)

        posterior = DiagonalGaussianDistribution(h)

        if not return_dict:
            return (posterior,)

        return AutoencoderKLOutput(latent_dist=posterior)

    def _decode(self, z: torch.Tensor, return_dict: bool = True) -> Union[DecoderOutput, torch.Tensor]:
        if self.use_tiling and (z.shape[-1] > self.tile_latent_min_size or z.shape[-2] > self.tile_latent_min_size):
            return self.tiled_decode(z, return_dict=return_dict)

        dec = self.decoder(z)

        if not return_dict:
            return (dec,)

        return DecoderOutput(sample=dec)

    @apply_forward_hook
    def decode(
        self, z: torch.FloatTensor, return_dict: bool = True, generator=None
    ) -> Union[DecoderOutput, torch.FloatTensor]:
        """
        Decode a batch of images.

        Args:
            z (`torch.Tensor`): Input batch of latent vectors.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether to return a [`~models.vae.DecoderOutput`] instead of a plain tuple.

        Returns:
            [`~models.vae.DecoderOutput`] or `tuple`:
                If return_dict is True, a [`~models.vae.DecoderOutput`] is returned, otherwise a plain `tuple` is
                returned.

        """
        if self.use_slicing and z.shape[0] > 1:
            decoded_slices = [self._decode(z_slice).sample for z_slice in z.split(1)]
            decoded = torch.cat(decoded_slices)
        else:
            decoded = self._decode(z).sample

        if not return_dict:
            return (decoded,)

        return DecoderOutput(sample=decoded)

    def blend_v(self, a: torch.Tensor, b: torch.Tensor, blend_extent: int) -> torch.Tensor:
        blend_extent = min(a.shape[2], b.shape[2], blend_extent)
        for y in range(blend_extent):
            b[:, :, y, :] = a[:, :, -blend_extent + y, :] * (1 - y / blend_extent) + b[:, :, y, :] * (y / blend_extent)
        return b

    def blend_h(self, a: torch.Tensor, b: torch.Tensor, blend_extent: int) -> torch.Tensor:
        blend_extent = min(a.shape[3], b.shape[3], blend_extent)
        for x in range(blend_extent):
            b[:, :, :, x] = a[:, :, :, -blend_extent + x] * (1 - x / blend_extent) + b[:, :, :, x] * (x / blend_extent)
        return b

    def _tiled_encode(self, x: torch.Tensor) -> torch.Tensor:
        r"""Encode a batch of images using a tiled encoder.

        When this option is enabled, the VAE will split the input tensor into tiles to compute encoding in several
        steps. This is useful to keep memory use constant regardless of image size. The end result of tiled encoding is
        different from non-tiled encoding because each tile uses a different encoder. To avoid tiling artifacts, the
        tiles overlap and are blended together to form a smooth output. You may still see tile-sized changes in the
        output, but they should be much less noticeable.

        Args:
            x (`torch.Tensor`): Input batch of images.

        Returns:
            `torch.Tensor`:
                The latent representation of the encoded videos.
        """

        overlap_size = int(self.tile_sample_min_size * (1 - self.tile_overlap_factor))
        blend_extent = int(self.tile_latent_min_size * self.tile_overlap_factor)
        row_limit = self.tile_latent_min_size - blend_extent

        # Split the image into 512x512 tiles and encode them separately.
        rows = []
        for i in range(0, x.shape[2], overlap_size):
            row = []
            for j in range(0, x.shape[3], overlap_size):
                tile = x[:, :, i : i + self.tile_sample_min_size, j : j + self.tile_sample_min_size]
                tile = self.encoder(tile)
                row.append(tile)
            rows.append(row)
        result_rows = []
        for i, row in enumerate(rows):
            result_row = []
            for j, tile in enumerate(row):
                # blend the above tile and the left tile
                # to the current tile and add the current tile to the result row
                if i > 0:
                    tile = self.blend_v(rows[i - 1][j], tile, blend_extent)
                if j > 0:
                    tile = self.blend_h(row[j - 1], tile, blend_extent)
                result_row.append(tile[:, :, :row_limit, :row_limit])
            result_rows.append(torch.cat(result_row, dim=3))

        enc = torch.cat(result_rows, dim=2)
        return enc

    def tiled_decode(self, z: torch.Tensor, return_dict: bool = True) -> Union[DecoderOutput, torch.Tensor]:
        r"""
        Decode a batch of images using a tiled decoder.

        Args:
            z (`torch.Tensor`): Input batch of latent vectors.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a [`~models.vae.DecoderOutput`] instead of a plain tuple.

        Returns:
            [`~models.vae.DecoderOutput`] or `tuple`:
                If return_dict is True, a [`~models.vae.DecoderOutput`] is returned, otherwise a plain `tuple` is
                returned.
        """
        overlap_size = int(self.tile_latent_min_size * (1 - self.tile_overlap_factor))
        blend_extent = int(self.tile_sample_min_size * self.tile_overlap_factor)
        row_limit = self.tile_sample_min_size - blend_extent

        # Split z into overlapping 64x64 tiles and decode them separately.
        # The tiles have an overlap to avoid seams between tiles.
        rows = []
        for i in range(0, z.shape[2], overlap_size):
            row = []
            for j in range(0, z.shape[3], overlap_size):
                tile = z[:, :, i : i + self.tile_latent_min_size, j : j + self.tile_latent_min_size]
                decoded = self.decoder(tile)
                row.append(decoded)
            rows.append(row)
        result_rows = []
        for i, row in enumerate(rows):
            result_row = []
            for j, tile in enumerate(row):
                # blend the above tile and the left tile
                # to the current tile and add the current tile to the result row
                if i > 0:
                    tile = self.blend_v(rows[i - 1][j], tile, blend_extent)
                if j > 0:
                    tile = self.blend_h(row[j - 1], tile, blend_extent)
                result_row.append(tile[:, :, :row_limit, :row_limit])
            result_rows.append(torch.cat(result_row, dim=3))

        dec = torch.cat(result_rows, dim=2)
        if not return_dict:
            return (dec,)

        return DecoderOutput(sample=dec)

    def forward(
        self,
        sample: torch.Tensor,
        sample_posterior: bool = False,
        return_dict: bool = True,
        generator: Optional[torch.Generator] = None,
    ) -> Union[DecoderOutput, torch.Tensor]:
        r"""
        Args:
            sample (`torch.Tensor`): Input sample.
            sample_posterior (`bool`, *optional*, defaults to `False`):
                Whether to sample from the posterior.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a [`DecoderOutput`] instead of a plain tuple.
        """
        x = sample
        posterior = self.encode(x).latent_dist
        if sample_posterior:
            z = posterior.sample(generator=generator)
        else:
            z = posterior.mode()
        dec = self.decode(z).sample

        if not return_dict:
            return (dec,)

        return DecoderOutput(sample=dec)
